Best Python code snippet using lisa_python
test_clash.py
Source:test_clash.py  
...61        self.subscriber.subscription_path.side_effect = (62            lambda project, name: "{}/{}".format(project, name)63        )64        self.logging_client = MagicMock()65    def get_compute_client(self):66        return self.compute67    def get_publisher(self):68        return self.publisher69    def get_subscriber(self):70        return self.subscriber71    def get_logging(self, project=None):72        return self.logging_client73class TestMachineConfig:74    def setup(self):75        self.gcloud = CloudSdkStub()76        self.cloud_init = clash.CloudInitConfig("_", "", TEST_JOB_CONFIG)77    def test_config_contains_vmname(self):78        manifest = clash.MachineConfig(79            self.gcloud.get_compute_client(), "myvm", self.cloud_init, TEST_JOB_CONFIG80        )81        machine_config = manifest.to_dict()82        assert machine_config["name"] == "myvm"83    def test_config_contains_cloud_init_config(self):84        config = clash.MachineConfig(85            self.gcloud.get_compute_client(),86            "_",87            clash.CloudInitConfig("myname", "_", TEST_JOB_CONFIG),88            TEST_JOB_CONFIG,89        )90        machine_config = config.to_dict()91        assert machine_config["metadata"]["items"][0]["key"] == "user-data"92        cloud_init = yaml.safe_load(machine_config["metadata"]["items"][0]["value"])93        assert cloud_init["users"][0]["name"] == "clash"94    def test_config_contains_machine_type(self):95        manifest = clash.MachineConfig(96            self.gcloud.get_compute_client(), "_", self.cloud_init, TEST_JOB_CONFIG97        )98        machine_config = manifest.to_dict()99        assert machine_config["machineType"] == "n1-standard-1"100    def test_automatic_restart_is_always_false(self):101        manifest = clash.MachineConfig(102            self.gcloud.get_compute_client(), "_", self.cloud_init, TEST_JOB_CONFIG103        )104        machine_config = manifest.to_dict()105        assert not machine_config["scheduling"]["automaticRestart"]106    def test_config_contains_preemptible_flag_if_set_true(self):107        job_config = copy.deepcopy(TEST_JOB_CONFIG)108        job_config["preemptible"] = True109        manifest = clash.MachineConfig(110            self.gcloud.get_compute_client(), "_", self.cloud_init, job_config111        )112        machine_config = manifest.to_dict()113        assert machine_config["scheduling"]["preemptible"]114    def test_config_contains_preemptible_flag_if_set_false(self):115        job_config = copy.deepcopy(TEST_JOB_CONFIG)116        job_config["preemptible"] = False117        manifest = clash.MachineConfig(118            self.gcloud.get_compute_client(), "_", self.cloud_init, job_config119        )120        machine_config = manifest.to_dict()121        assert not machine_config["scheduling"]["preemptible"]122    def test_config_contains_custom_service_account(self):123        job_config = copy.deepcopy(TEST_JOB_CONFIG)124        job_config["service_account"] = "myaccount@foo.bar"125        manifest = clash.MachineConfig(126            self.gcloud.get_compute_client(), "_", self.cloud_init, job_config127        )128        machine_config = manifest.to_dict()129        assert machine_config["serviceAccounts"][0]["email"] == "myaccount@foo.bar"130    def test_config_contains_default_service_account_by_default(self):131        job_config = copy.deepcopy(TEST_JOB_CONFIG)132        manifest = clash.MachineConfig(133            self.gcloud.get_compute_client(), "_", self.cloud_init, job_config134        )135        machine_config = manifest.to_dict()136        assert machine_config["serviceAccounts"][0]["email"] == "default"137    def test_config_contains_labels(self):138        job_config = copy.deepcopy(TEST_JOB_CONFIG)139        job_config["labels"] = {"customer": "dummy"}140        manifest = clash.MachineConfig(141            self.gcloud.get_compute_client(), "_", self.cloud_init, job_config142        )143        machine_config = manifest.to_dict()144        assert machine_config["labels"] == {"customer": "dummy"}145    def test_config_empty_labels(self):146        job_config = copy.deepcopy(TEST_JOB_CONFIG)147        manifest = clash.MachineConfig(148            self.gcloud.get_compute_client(), "_", self.cloud_init, job_config149        )150        machine_config = manifest.to_dict()151        assert machine_config["labels"] == {}152def test_argument_to_script_with_whitespace():153    res = clash.translate_args_to_script(args=["echo", "hello world"])154    assert res == "echo 'hello world'"155def test_argument_to_script():156    res = clash.translate_args_to_script(args=["echo", "world"])157    assert res == "echo world"158class TestJob:159    def setup(self):160        self.gcloud = CloudSdkStub()161    @patch("uuid.uuid1")162    def test_creates_job(self, mock_uuid_call):163        mock_uuid_call.return_value = 1234164        job = clash.Job(TEST_JOB_CONFIG, gcloud=self.gcloud)165        assert "clash-job-1234" == job.name166    @patch("uuid.uuid1")167    def test_creates_job_with_name_prefix(self, mock_uuid_call):168        mock_uuid_call.return_value = 1234169        job = clash.Job(TEST_JOB_CONFIG, gcloud=self.gcloud, name_prefix="foo")170        assert "foo-clash-job-1234" == job.name171    def test_running_a_job_creates_an_instance_template(self):172        job = clash.Job(TEST_JOB_CONFIG, gcloud=self.gcloud)173        job.run(args=[])174        self.gcloud.get_compute_client().instanceTemplates.return_value.insert.return_value.execute.assert_called()175    def test_running_a_job_creates_a_managed_instance_group(self):176        job = clash.Job(TEST_JOB_CONFIG, gcloud=self.gcloud)177        job.run(args=[])178        self.gcloud.get_compute_client().instanceGroupManagers.return_value.insert.return_value.execute.assert_called()179    def test_deletes_instance_template_after_job_is_complete(self):180        self.gcloud.get_compute_client().instanceGroups.return_value.list.return_value.execute.return_value = {181            "items": [{"name": "anothergroup"}]182        }183        with clash.Job(TEST_JOB_CONFIG, gcloud=self.gcloud) as job:184            job.run(args=[])185        self.gcloud.get_compute_client().instanceTemplates.return_value.delete.return_value.execute.assert_called()186    def test_removes_subscription_if_job_creation_failed(self):187        self.gcloud.get_compute_client().instanceGroupManagers.return_value.insert.return_value.execute.side_effect = Exception(188            "Failure!"189        )190        job = clash.Job(TEST_JOB_CONFIG, gcloud=self.gcloud)191        with pytest.raises(Exception) as e_info:192            job.run(args=[])193        self.gcloud.get_subscriber().delete_subscription.assert_called_with(194            f"{TEST_JOB_CONFIG['project_id']}/{job.name}"195        )196    def test_removes_topic_if_job_creation_failed(self):197        self.gcloud.get_compute_client().instanceGroupManagers.return_value.insert.return_value.execute.side_effect = Exception(198            "Failure!"199        )200        job = clash.Job(TEST_JOB_CONFIG, gcloud=self.gcloud)201        with pytest.raises(Exception) as e_info:202            job.run(args=[])203        self.gcloud.get_publisher().delete_topic.assert_called_with(204            f"{TEST_JOB_CONFIG['project_id']}/{job.name}"205        )206    def test_running_a_job_creates_a_topic_path(self):207        job = clash.Job(TEST_JOB_CONFIG, gcloud=self.gcloud)208        job.run(args=[])209        self.gcloud.get_publisher().topic_path.assert_called_with(210            TEST_JOB_CONFIG["project_id"], job.name211        )...compute.py
Source:compute.py  
...25from oslo_log import log as logging26LOG = logging.getLogger(__name__)27bp = Blueprint('compute', __name__)28CONF = cfg.CONF29def get_compute_client(req):30    try:31        loader = loading.get_plugin_loader('v3token')32        auth = loader.load_from_options(33            auth_url=CONF.keystone_authtoken.auth_url,34            token=req.headers.get('X-Auth-Token'),35            project_name=CONF.keystone_authtoken.project_name,36            project_domain_name=CONF.keystone_authtoken.project_domain_name37        )38        sess = session.Session(auth=auth,39                               verify=not CONF.keystone_authtoken.insecure)40        compute_client = novaClient.Client(41            # api version for live_migrate with block_migration='auto'42            '2.25',43            endpoint_type="internalURL",44            session=sess45        )46        return compute_client47    except Exception as e:48        LOG.error(e)49        abort(500, 'Failed to get compute novaclient')50def complete_with_errors_response(msg, contents):51    response = jsonify({'error_msg': msg, 'contents': contents})52    response.status_code = 50053    return response54@bp.route("/api/v2/compute/services/<hostname>", methods=['GET'])55@policy.enforce('lifecycle:get_compute')56def compute_services_status(hostname):57    """Get the compute services status for a compute host58        .. :quickref: Compute; Get the compute services status59        **Example Request**:60        .. sourcecode:: http61           GET /api/v2/compute/services/<hostname> HTTP/1.162           Content-Type: application/json63        **Example Response**:64        .. sourcecode:: http65            HTTP/1.1 200 OK66            {67                "nova-compute": "enabled"68            }69    """70    # mock for getting nova service status for a compute host71    if cfg.CONF.testing.use_mock:72        mock_json = "tools/compute-mock-data.json"73        json_file = os.path.join(74            os.path.dirname(os.path.dirname(__file__)), mock_json)75        with open(json_file) as f:76            return jsonify(json.load(f)['compute_services_status'])77    compute_client = get_compute_client(request)78    compute_services = compute_client.services.list(host=hostname)79    if len(compute_services) == 0:80        msg = 'No compute service for %s' % hostname81        LOG.error(msg)82        abort(410, msg)83    services = dict()84    for service in compute_services:85        binary = getattr(service, 'binary', None)86        if binary:87            services[binary] = \88                getattr(service, 'status', None) == 'enabled'89    return jsonify(services)90@bp.route("/api/v2/compute/services/<hostname>/disable", methods=['PUT'])91@policy.enforce('lifecycle:update_compute')92def compute_disable_services(hostname):93    """Disable the compute services for a compute host94        .. :quickref: Compute; Disable the compute services95        **Example Request**:96        .. sourcecode:: http97           PUT /api/v2/compute/services/<hostname>/disable HTTP/1.198           Content-Type: application/json99        **Example Response**:100        .. sourcecode:: http101            HTTP/1.1 200 OK102            [{103                "binary": "nova-compute",104                "id": 1105            }]106    """107    # mock for running nova disable service for a compute host108    if cfg.CONF.testing.use_mock:109        mock_json = "tools/compute-mock-data.json"110        json_file = os.path.join(111            os.path.dirname(os.path.dirname(__file__)), mock_json)112        with open(json_file) as f:113            return jsonify(json.load(f)['disable_compute_services'])114    compute_client = get_compute_client(request)115    compute_services = compute_client.services.list(host=hostname)116    if len(compute_services) == 0:117        msg = 'No compute service for %s' % hostname118        LOG.error(msg)119        abort(410, msg)120    failed = []121    disabled = []122    for service in compute_services:123        binary = getattr(service, 'binary', '')124        id = getattr(service, 'id')125        status = getattr(service, 'status', '')126        if status == 'enabled':127            try:128                compute_client.services.disable(hostname, binary)129                disabled.append({'id': id, 'binary': binary})130            except Exception as ex:131                failed.append({'id': id, 'binary': binary, 'error': str(ex)})132                LOG.error(133                    'Failed to disable compute service for %s id = %s' +134                    'binary = % s' % (hostname, id, binary))135                LOG.error(ex)136        else:137            # already disabled, will not call138            disabled.append({'id': id, 'binary': binary})139    if len(failed) > 0:140        return complete_with_errors_response(141            'Completed disabling compute services with errors',142            {'failed': failed, 'disabled': disabled})143    return jsonify(disabled)144@bp.route("/api/v2/compute/services/<hostname>/enable", methods=['PUT'])145@policy.enforce('lifecycle:update_compute')146def compute_enable_services(hostname):147    """Enable the compute services for a compute host148        .. :quickref: Compute; Enable the compute services149        **Example Request**:150        .. sourcecode:: http151           PUT /api/v2/compute/services/<hostname>/enable HTTP/1.1152           Content-Type: application/json153        **Example Response**:154        .. sourcecode:: http155            HTTP/1.1 200 OK156            [{157                "binary": "nova-compute",158                "id": 1159            }]160    """161    # mock for running nova disable service for a compute host162    if cfg.CONF.testing.use_mock:163        mock_json = "tools/compute-mock-data.json"164        json_file = os.path.join(165            os.path.dirname(os.path.dirname(__file__)), mock_json)166        with open(json_file) as f:167            return jsonify(json.load(f)['enable_compute_services'])168    compute_client = get_compute_client(request)169    compute_services = compute_client.services.list(host=hostname)170    if len(compute_services) == 0:171        msg = 'No compute service for %s' % hostname172        LOG.error(msg)173        abort(410, msg)174    failed = []175    enabled = []176    for service in compute_services:177        binary = getattr(service, 'binary', '')178        id = getattr(service, 'id')179        status = getattr(service, 'status', '')180        if status == 'disabled':181            try:182                compute_client.services.enable(hostname, binary)183                enabled.append({'id': id, 'binary': binary})184            except Exception as ex:185                failed.append({'id': id, 'binary': binary, 'error': str(ex)})186                LOG.error(187                    'Failed to enable compute service for %s id = %s' +188                    'binary = % s' % (hostname, id, binary))189                LOG.error(ex)190        else:191            # already enabled, will not call192            enabled.append({'id': id, 'binary': binary})193    if len(failed) > 0:194        return complete_with_errors_response(195            'Completed enabling compute services with errors',196            {'failed': failed, 'enabled': enabled})197    return jsonify(enabled)198@bp.route("/api/v2/compute/services/<hostname>", methods=['DELETE'])199@policy.enforce('lifecycle:update_compute')200def compute_delete_services(hostname):201    """Delete the compute services for a compute host202        .. :quickref: Compute; Delete the compute services203        **Example Request**:204        .. sourcecode:: http205           DELETE /api/v2/compute/services/<hostname> HTTP/1.1206           Content-Type: application/json207        **Example Response**:208        .. sourcecode:: http209            HTTP/1.1 200 OK210             [{211                "binary": "nova-compute"212                "id": 1213            }]214    """215    # mock for running nova delete service for a compute host216    if cfg.CONF.testing.use_mock:217        mock_json = "tools/compute-mock-data.json"218        json_file = os.path.join(219            os.path.dirname(os.path.dirname(__file__)), mock_json)220        with open(json_file) as f:221            return jsonify(json.load(f)['delete_compute_services'])222    compute_client = get_compute_client(request)223    compute_services = compute_client.services.list(host=hostname)224    if len(compute_services) == 0:225        msg = 'No compute service for %s' % hostname226        LOG.error(msg)227        abort(410, msg)228    failed = []229    deleted = []230    for service in compute_services:231        binary = getattr(service, 'binary', '')232        id = getattr(service, 'id')233        try:234            compute_client.services.delete(id)235            deleted.append({'id': id, 'binary': binary})236        except Exception as ex:237            failed.append({'id': id, 'binary': binary, 'error': str(ex)})238            LOG.error(239                'Failed to delete compute service for %s id = %s binary = %s'240                % (hostname, id, binary))241            LOG.error(ex)242    if len(failed) > 0:243        return complete_with_errors_response(244            'Completed deleting compute services with errors',245            {'failed': failed, 'deleted': deleted})246    return jsonify(deleted)247@bp.route("/api/v2/compute/aggregates/<hostname>", methods=['DELETE'])248@policy.enforce('lifecycle:update_compute')249def compute_delete_aggregates(hostname):250    """Delete the aggregates for a compute host251        .. :quickref: Compute; Delete aggregates for a compute host252        **Example Request**:253        .. sourcecode:: http254           DELETE /api/v2/compute/aggregates/<hostname> HTTP/1.1255           Content-Type: application/json256        **Example Response**:257        .. sourcecode:: http258            HTTP/1.1 200 OK259            [{260                "availability_zone": "test-az",261                "id": 3,262                "name": "agg_group3"263            }, {264                "availability_zone": null,265                "id": 1,266                "name": "agg_group1"267            }, {268                "availability_zone": null,269                "id": 2,270                "name": "agg_group2"271            }]272    """273    # mock for running nova delete aggregates for a compute host274    # mock contains partial failure275    if cfg.CONF.testing.use_mock:276        mock_json = "tools/compute-mock-data.json"277        json_file = os.path.join(278            os.path.dirname(os.path.dirname(__file__)), mock_json)279        with open(json_file) as f:280            return complete_with_errors_response(281                'Completed deleting aggregates with errors',282                json.load(f)['delete_aggregates'])283    compute_client = get_compute_client(request)284    # list all the aggregates285    compute_aggregates = compute_client.aggregates.list()286    if len(compute_aggregates) == 0:287        msg = 'No aggregates found for %s ' % hostname288        LOG.info(msg)289        abort(410, msg)290    # get details so we can decide which one we need to291    # remove compute host292    aggregates = []293    for aggr in compute_aggregates:294        details = compute_client.aggregates.get(aggr)295        id = getattr(aggr, 'id')296        name = getattr(aggr, 'name')297        az = getattr(aggr, 'availability_zone')298        if hostname in getattr(details, 'hosts', []):299            aggregates.append({300                'id': id, 'name': name, 'availability_zone': az})301    if len(aggregates) == 0:302        msg = 'No aggregates found for %s ' % hostname303        LOG.info(msg)304        abort(410, msg)305    failed = []306    deleted = []307    for aggr in aggregates:308        id = aggr['id']309        name = aggr['name']310        az = aggr['availability_zone']311        try:312            compute_client.aggregates.remove_host(id, hostname)313            deleted.append({'id': id, 'name': name, 'availability_zone': az})314        except Exception as ex:315            failed.append({316                'id': id, 'name': name,317                'availability_zone': az, 'error': str(ex)})318            LOG.error(319                'Failed to delete aggregate for %s id = %s name = %s '320                'availability_zone = %s' % (hostname, id, name, az))321            LOG.error(ex)322    if len(failed) > 0:323        return complete_with_errors_response(324            'Completed deleting aggregates with errors',325            {'failed': failed, 'deleted': deleted})326    return jsonify(deleted)327@bp.route(328    "/api/v2/compute/instances/<src_hostname>/<target_hostname>/migrate",329    methods=['PUT'])330@policy.enforce('lifecycle:update_compute')331def compute_migrate_instances(src_hostname, target_hostname):332    """Migrate instances of a compute host to another compute host333        .. :quickref: Compute; Live migrate instances of a compute host334        **Example Request**:335        .. sourcecode:: http336           PUT337           /api/v2/compute/instances/<src_hostname>/<target_hostname>/migrate338           HTTP/1.1339           Content-Type: application/json340        **Example Response**:341        .. sourcecode:: http342            HTTP/1.1 200 OK343            [{344                "id": "8279e65d-6e87-4a50-b789-96edd753fbb2",345                "name": "test3"346            }, {347                "id": "1d51f18f-27fd-4c34-a0aa-c07a5e9462e7",348                "name": "test2"349            }]350    """351    # mock for running  nova instance live migrating for a compute host352    if cfg.CONF.testing.use_mock:353        mock_json = "tools/compute-mock-data.json"354        json_file = os.path.join(355            os.path.dirname(os.path.dirname(__file__)), mock_json)356        with open(json_file) as f:357            return jsonify(json.load(f)['migrate_instances'])358    compute_client = get_compute_client(request)359    search_opts = {360        'all_tenants': 1,  # all tenants361        'host': src_hostname362    }363    instances = compute_client.servers.list(search_opts=search_opts)364    if len(instances) == 0:365        msg = 'No instances found for %s' % src_hostname366        LOG.info(msg)367        abort(410, msg)368    migrating = []  # list of migrating instance ids and names369    failed = []  # list of failed instance ids, names and errors370    for inst in instances:371        id = getattr(inst, 'id')372        name = getattr(inst, 'name')373        try:374            compute_client.servers.live_migrate(375                id, target_hostname, block_migration='auto')376            migrating.append({'id': id, 'name': name})377        except Exception as ex:378            failed.append({'id': id, 'name': name, 'error': str(ex)})379            LOG.error(380                'Failed to start migrating instance of %s id = %s name = %s' %381                (src_hostname, id, name))382            LOG.error(ex)383    if len(failed) > 0:384        return complete_with_errors_response(385            'Completed migrating instances with errors',386            {'failed': failed, 'migrating': migrating})387    return jsonify(migrating)388@bp.route("/api/v2/compute/instances/<hostname>", methods=['GET'])389@policy.enforce('lifecycle:get_compute')390def compute_get_instances(hostname):391    """Return instances of a compute host392        .. :quickref: Compute; Get instances of a compute host393        **Example Request**:394        .. sourcecode:: http395           GET /api/v2/compute/instances/<hostname> HTTP/1.1396           Content-Type: application/json397        **Example Response**:398        .. sourcecode:: http399           HTTP/1.1 200 OK400            [{401                "id": "ab3622db-648b-435d-b9af-f279e57bd8c9",402                "name": "test4",403                "status": "ACTIVE"404            }]405    """406    # mock for running  nova instance list indicating whether all instances for407    # a compute host are migrated408    if cfg.CONF.testing.use_mock:409        mock_json = "tools/compute-mock-data.json"410        json_file = os.path.join(411            os.path.dirname(os.path.dirname(__file__)), mock_json)412        with open(json_file) as f:413            return jsonify(json.load(f)['get_instances'])414    compute_client = get_compute_client(request)415    try:416        search_opts = {417            'all_tenants': 1,  # all tenants418            'host': hostname419        }420        instances = compute_client.servers.list(search_opts=search_opts)421        ret_instances = [422            {'id': getattr(inst, 'id'), 'name': getattr(inst, 'name'),423             'status': getattr(inst, 'status')} for inst in instances424        ]425        return jsonify(ret_instances)426    except Exception as e:427        msg = \428            'Failed to get instances for compute host %s' % hostname...main.py
Source:main.py  
...4from google.oauth2 import service_account5from googleapiclient import errors6class GCPSDKObject():7    SERVICE_ACCOUNT_FILE = '/path/to/service-account-file.json'8    def get_compute_client(self):9        credentials = service_account.Credentials.from_service_account_file(10            self.SERVICE_ACCOUNT_FILE)11        service = discovery.build('compute', 'v1', credentials=credentials)12        return service13    def backend_buckets_list(self, project_name: str):14        """15        ååºGCSå端16        """17        service = self.get_compute_client()18        result = list()19        request = service.backendBuckets().list(project=project_name)20        while request is not None:21            try:22                response = request.execute()23            except errors.HttpError as err:24                print(err._get_reason())25                raise err26            for backend_service in response['items']:27                pprint(backend_service)28                result.append(backend_service)29            request = service.backendServices().list_next(previous_request=request, previous_response=response)30        return result31    def backend_buckets_create(self, project_name: str, backend_bucket_name: str, gcs_bucket_name: str):32        """33        å建 backend GCS å端34        :param project_name:  project_id æè
 project_name35        :param backend_bucket_name:  æ°å建ç backend åç§°36        :param gcs_bucket_name:  GCPä¾§ cloud storage ä¸çbucketåç§°ï¼ä¸å®è¦åå®é
çä¸è´37        :return:38        """39        # api: https://cloud.google.com/compute/docs/reference/rest/v1/backendBuckets/insert40        service = self.get_compute_client()41        body = {42            "name": backend_bucket_name,43            "bucketName": gcs_bucket_name,44            "enableCdn": True45        }46        request = service.backendBuckets().insert(project=project_name, body=body)47        try:48            response = request.execute()49            pprint(response)50            return response51        except errors.HttpError as err:52            # Something went wrong, print out some information.53            print('There was an error creating the model. Check the details:')54            print(err._get_reason())55            raise err56    def backend_gcs_url_maps_list(self, project_name: str):57        service = self.get_compute_client()58        result = list()59        request = service.urlMaps().list(project=project_name)60        while request is not None:61            try:62                response = request.execute()63            except errors.HttpError as err:64                # Something went wrong, print out some information.65                print('There was an error creating the model. Check the details:')66                print(err._get_reason())67                raise err68            for url_map in response['items']:69                pprint(url_map)70                result.append(url_map)71            request = service.urlMaps().list_next(previous_request=request, previous_response=response)72        return result73    def backend_gcs_url_maps_create(self, project_name: str, url_map_name: str, backend_or_service_name: str,74                                    is_bucket=True):75        """76         é
ç½®GCSå端 ç½åæ å°77        :param project_name:78        :param url_map_name:79        :param backend_or_service_name:80        :param is_bucket: default=True81        :return:82        """83        # api: https://cloud.google.com/compute/docs/reference/rest/v1/urlMaps/insert84        service = self.get_compute_client()85        if is_bucket:86            service_url = f"compute/v1/projects/{project_name}/global/backendBuckets/{backend_or_service_name}"87        else:88            service_url = f"compute/v1/projects/{project_name}/global/backendServices/{backend_or_service_name}"89        body = {90            "name": url_map_name,91            "defaultService": service_url,92        }93        request = service.urlMaps().insert(project=project_name, body=body)94        try:95            response = request.execute()96            pprint(response)97            return response98        except errors.HttpError as err:99            print(err._get_reason())100            raise err101    def compute_global_addresses_list(self, project_name: str):102        service = self.get_compute_client()103        result = list()104        request = service.globalAddresses().list(project=project_name)105        while request is not None:106            try:107                response = request.execute()108            except errors.HttpError as err:109                print(err._get_reason())110                raise err111            for address in response['items']:112                pprint(address)113                result.append(address)114            request = service.globalAddresses().list_next(previous_request=request, previous_response=response)115        return result116    def compute_global_addresses_create(self, project_name: str, address_name: str, ip_type: str = "IPV4"):117        """118        å建å
¨å±éæå¤é¨ IP å°å119        :param project_name:120        :param address_name:121        :param ip_type:122        :return:123        """124        service = self.get_compute_client()125        body = {126            "name": address_name,127            "ipVersion": ip_type,128            "addressType": "EXTERNAL"129        }130        request = service.globalAddresses().insert(project=project_name, body=body)131        try:132            response = request.execute()133            pprint(response)134            return response135        except errors.HttpError as err:136            print(err._get_reason())137            raise err138    def target_http_proxies_create(self, project_name: str, proxy_name: str, url_map_name: str):139        """140        å建代ç proxy141        åå»ºç®æ  HTTP 代ç以å°è¯·æ±è·¯ç±å°æ¨çç½åæ å°142        """143        service = self.get_compute_client()144        body = {145            "name": proxy_name,146            "urlMap": url_map_name147        }148        request = service.targetHttpProxies().insert(project=project_name, body=body)149        try:150            response = request.execute()151            pprint(response)152            return response153        except errors.HttpError as err:154            print(err._get_reason())155            raise err156    def target_https_proxies_create(self, project_name: str, proxy_name: str, url_map_name: str, cert_name: str):157        """158        å建代ç proxy 使ç¨sslè¯ä¹¦159        """160        service = self.get_compute_client()161        body = {162            "name": proxy_name,163            "urlMap": url_map_name,164            "sslCertificates": [cert_name]165        }166        request = service.targetHttpsProxies().insert(project=project_name, body=body)167        try:168            response = request.execute()169            pprint(response)170            return response171        except errors.HttpError as err:172            print(err._get_reason())173            raise err174    def ssl_certificates_create(self, cert_name: str, certificate: str, private_key: str, description: str = "",175                                project_name: str = "spoton-project"):176        """177        å建è¯ä¹¦178            gcloud compute ssl-certificates list179        """180        body = {"name": cert_name, "certificate": certificate, "privateKey": private_key, "description": description}181        compute = self.get_compute_client()182        request = compute.sslCertificates().insert(project=project_name, body=body)183        try:184            response = request.execute()185            return response186        except errors.HttpError as err:187            print(err._get_reason())188            raise err189    def global_forwarding_rules_create(self, project_name: str, forwarding_rule_name: str, address_name: str, port: int,190                                       proxy_name: str):191        """192        å端é
ç½®193        """194        service = self.get_compute_client()195        body = {196            "name": forwarding_rule_name,197            "IPAddress": address_name,198            "target": proxy_name,199            "ports": [200                str(port)201            ]202        }203        request = service.globalForwardingRules().insert(project=project_name, body=body)204        try:205            response = request.execute()206            return response207        except errors.HttpError as err:208            print(err._get_reason())...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
