Best Python code snippet using localstack_python
test_azure_scanner.py
Source:test_azure_scanner.py  
1import json2import os3import tempfile4import unittest5from unittest.mock import patch, ANY6from dragoneye.cloud_scanner.azure.azure_scanner import AzureScanner, AzureCloudScanSettings7from mockito import when, unstub, mock8import dragoneye9class TestAzureScanner(unittest.TestCase):10    @classmethod11    def setUpClass(cls) -> None:12        cls.token = 'token'13        cls.auth = {'Authorization': cls.token}14        cls.subscription_id = 'subscription-id'15        cls.account_name = 'test-account'16        cls.resource_groups = ['resourceGroup1', 'resourceGroup2']17        cls.resource_groups_text = json.dumps({"value": [{"name": cls.resource_groups[0]}, {"name": cls.resource_groups[1]}]})18    def setUp(self) -> None:19        self.temp_dir = tempfile.TemporaryDirectory()20        self.azure_settings = AzureCloudScanSettings(21            commands_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), 'resources', 'azure_scan_commands.yaml'),22            subscription_id=self.subscription_id,23            account_name=self.account_name,24            should_clean_before_scan=True,25            output_path=self.temp_dir.name26        )27        when(dragoneye.cloud_scanner.azure.azure_scanner).invoke_get_request(ANY, ANY, on_giveup=ANY).\28            thenReturn(mock({'status_code': 200, 'text': '{}'}))29    def tearDown(self) -> None:30        self.temp_dir.cleanup()31        unstub()32    def test_scan_ok_with_values(self):33        # Arrange34        rg_id = "/subscriptions/{}/resourceGroups/{}"35        ### Resource Groups36        when(dragoneye.cloud_scanner.azure.azure_scanner)\37            .invoke_get_request(38            f'https://management.azure.com/subscriptions/{self.subscription_id}/resourcegroups?api-version=2020-09-01',39            self.auth, on_giveup=ANY) \40            .thenReturn(mock({'status_code': 200, 'text': self.resource_groups_text}))41        ### request1, resourceGroup142        when(dragoneye.cloud_scanner.azure.azure_scanner)\43            .invoke_get_request(44            f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[0]}/providers/Microsoft.Compute/virtualMachines?api-version=2020-12-01',45            self.auth, on_giveup=ANY)\46            .thenReturn(mock({'status_code': 200, 'text': json.dumps({"value": [{"id": rg_id.format(self.subscription_id, self.resource_groups[0]), "vmName": f'{self.resource_groups[0]}-vm'}]})}))47        ### request1, resourceGroup248        when(dragoneye.cloud_scanner.azure.azure_scanner) \49            .invoke_get_request(50            f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[1]}/providers/Microsoft.Compute/virtualMachines?api-version=2020-12-01',51            self.auth, on_giveup=ANY) \52            .thenReturn(mock({'status_code': 200, 'text': json.dumps({"value": [{"id": rg_id.format(self.subscription_id, self.resource_groups[1]), "vmName": f'{self.resource_groups[1]}-vm'}]})}))53        ### request2, vm154        when(dragoneye.cloud_scanner.azure.azure_scanner)\55            .invoke_get_request(56            f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[0]}/providers/Microsoft.Compute/virtualMachines/{f"{self.resource_groups[0]}-vm"}?api-version=2020-12-01',57            self.auth, on_giveup=ANY)\58            .thenReturn(mock({'status_code': 200, 'text': json.dumps({"value": [{"id": rg_id.format(self.subscription_id, self.resource_groups[0]), "vmName": f'{self.resource_groups[0]}-vm'}]})}))59        ### request2, vm260        when(dragoneye.cloud_scanner.azure.azure_scanner)\61            .invoke_get_request(62            f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[1]}/providers/Microsoft.Compute/virtualMachines/{f"{self.resource_groups[1]}-vm"}?api-version=2020-12-01',63            self.auth, on_giveup=ANY)\64            .thenReturn(mock({'status_code': 200, 'text': json.dumps({"value": [{"id": rg_id.format(self.subscription_id, self.resource_groups[1]), "vmName": f'{self.resource_groups[1]}-vm'}]})}))65        # Act66        scanner = AzureScanner(self.token, self.azure_settings)67        output_path = scanner.scan()68        account_data_dir = os.path.join(output_path, self.account_name)69        # Assert70        self.assertTrue(os.path.isfile(os.path.join(account_data_dir, 'resource-groups.json')))71        self.assertTrue(os.path.isfile(os.path.join(account_data_dir, 'request1.json')))72        self.assertTrue(os.path.isfile(os.path.join(account_data_dir, 'request2.json')))73        with open(os.path.join(account_data_dir, 'resource-groups.json'), 'r') as result_file:74            results = json.load(result_file)75            value = results['value']76            self.assertTrue(any(dic['name'] == self.resource_groups[0] for dic in value))77            self.assertTrue(any(dic['name'] == self.resource_groups[1] for dic in value))78        with open(os.path.join(account_data_dir, 'request1.json'), 'r') as result_file:79            results = json.load(result_file)80            value = results['value']81            self.assertTrue(any(dic['vmName'] == f'{self.resource_groups[0]}-vm' for dic in value))82            self.assertTrue(any(dic['vmName'] == f'{self.resource_groups[1]}-vm' for dic in value))83            self.assertTrue(any(dic['resourceGroup'] == self.resource_groups[0] for dic in value))84            self.assertTrue(any(dic['resourceGroup'] == self.resource_groups[1] for dic in value))85        with open(os.path.join(account_data_dir, 'request2.json'), 'r') as result_file:86            results = json.load(result_file)87            value = results['value']88            self.assertTrue(any(dic['vmName'] == f'{self.resource_groups[0]}-vm' for dic in value))89            self.assertTrue(any(dic['vmName'] == f'{self.resource_groups[1]}-vm' for dic in value))90            self.assertTrue(any(dic['resourceGroup'] == self.resource_groups[0] for dic in value))91            self.assertTrue(any(dic['resourceGroup'] == self.resource_groups[1] for dic in value))92    def test_scan_ok_no_resources(self):93        # Arrange94        ### Resource Groups95        when(dragoneye.cloud_scanner.azure.azure_scanner)\96            .invoke_get_request(97            f'https://management.azure.com/subscriptions/{self.subscription_id}/resourcegroups?api-version=2020-09-01',98            self.auth, on_giveup=ANY) \99            .thenReturn(mock({'status_code': 200, 'text': self.resource_groups_text}))100        ### request1, resourceGroup1101        when(dragoneye.cloud_scanner.azure.azure_scanner)\102            .invoke_get_request(103            f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[0]}/providers/Microsoft.Compute/virtualMachines?api-version=2020-12-01',104            self.auth, on_giveup=ANY)\105            .thenReturn(mock({'status_code': 200, 'text': json.dumps({"value": []})}))106        ### request1, resourceGroup2107        when(dragoneye.cloud_scanner.azure.azure_scanner) \108            .invoke_get_request(109            f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[1]}/providers/Microsoft.Compute/virtualMachines?api-version=2020-12-01',110            self.auth, on_giveup=ANY) \111            .thenReturn(mock({'status_code': 200, 'text': json.dumps({"value": []})}))112        # Act113        scanner = AzureScanner(self.token, self.azure_settings)114        output_path = scanner.scan()115        account_data_dir = os.path.join(output_path, self.account_name)116        # Assert117        self.assertTrue(os.path.isfile(os.path.join(account_data_dir, 'resource-groups.json')))118        self.assertTrue(os.path.isfile(os.path.join(account_data_dir, 'request1.json')))119        self.assertTrue(os.path.isfile(os.path.join(account_data_dir, 'request2.json')))120        with open(os.path.join(account_data_dir, 'resource-groups.json'), 'r') as result_file:121            results = json.load(result_file)122            value = results['value']123            self.assertTrue(any(dic['name'] == self.resource_groups[0] for dic in value))124            self.assertTrue(any(dic['name'] == self.resource_groups[1] for dic in value))125        with open(os.path.join(account_data_dir, 'request1.json'), 'r') as result_file:126            results = json.load(result_file)127            value = results['value']128            self.assertListEqual(value, list())129        with open(os.path.join(account_data_dir, 'request2.json'), 'r') as result_file:130            results = json.load(result_file)131            value = results['value']132            self.assertListEqual(value, list())133    @patch('logging.Logger.exception')134    def test_scan_failed_request(self, patched_logger):135        """136        Testing that even if a specific request raises an exception, we continue to scan and do not crash.137        """138        # Arrange139        rg_id = "/subscriptions/{}/resourceGroups/{}"140        ### Resource Groups141        when(dragoneye.cloud_scanner.azure.azure_scanner)\142            .invoke_get_request(143            f'https://management.azure.com/subscriptions/{self.subscription_id}/resourcegroups?api-version=2020-09-01',144            self.auth, on_giveup=ANY) \145            .thenReturn(mock({'status_code': 200, 'text': self.resource_groups_text}))146        ### request1, resourceGroup1147        when(dragoneye.cloud_scanner.azure.azure_scanner)\148            .invoke_get_request(149            f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[0]}/providers/Microsoft.Compute/virtualMachines?api-version=2020-12-01',150            self.auth, on_giveup=ANY)\151            .thenReturn(mock({'status_code': 200, 'text': json.dumps({"value": [{"id": rg_id.format(self.subscription_id, self.resource_groups[0]), "vmName": f'{self.resource_groups[0]}-vm'}]})}))152        ### request1, resourceGroup2153        when(dragoneye.cloud_scanner.azure.azure_scanner) \154            .invoke_get_request(155            f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[1]}/providers/Microsoft.Compute/virtualMachines?api-version=2020-12-01',156            self.auth, on_giveup=ANY) \157            .thenReturn(mock({'status_code': 200, 'text': json.dumps({"value": [{"id": rg_id.format(self.subscription_id, self.resource_groups[1]), "vmName": f'{self.resource_groups[1]}-vm'}]})}))158        ### request2, vm1159        when(dragoneye.cloud_scanner.azure.azure_scanner)\160            .invoke_get_request(161            f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[0]}/providers/Microsoft.Compute/virtualMachines/{f"{self.resource_groups[0]}-vm"}?api-version=2020-12-01',162            self.auth, on_giveup=ANY)\163            .thenReturn(mock({'status_code': 200, 'text': json.dumps({"value": [{"id": rg_id.format(self.subscription_id, self.resource_groups[0]), "vmName": f'{self.resource_groups[0]}-vm'}]})}))164        ### request2, vm2165        when(dragoneye.cloud_scanner.azure.azure_scanner)\166            .invoke_get_request(167            f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[1]}/providers/Microsoft.Compute/virtualMachines/{f"{self.resource_groups[1]}-vm"}?api-version=2020-12-01',168            self.auth, on_giveup=ANY)\169            .thenReturn(mock({'status_code': 200, 'text': json.dumps({"value": [{"id": rg_id.format(self.subscription_id, self.resource_groups[1]), "vmName": f'{self.resource_groups[1]}-vm'}]})}))170        ### request3171        when(dragoneye.cloud_scanner.azure.azure_scanner)\172            .invoke_get_request(173            f'https://management.azure.com/subscriptions/{self.subscription_id}/providers/Microsoft.Compute/request3?api-version=2020-12-01',174            self.auth, on_giveup=ANY)\175            .thenRaise(Exception('some exception'))176        # Act177        scanner = AzureScanner(self.token, self.azure_settings)178        output_path = scanner.scan()179        account_data_dir = os.path.join(output_path, self.account_name)180        # Assert181        self.assertTrue(os.path.isfile(os.path.join(account_data_dir, 'resource-groups.json')))182        self.assertTrue(os.path.isfile(os.path.join(account_data_dir, 'request1.json')))183        self.assertTrue(os.path.isfile(os.path.join(account_data_dir, 'request2.json')))184        self.assertFalse(os.path.isfile(os.path.join(account_data_dir, 'request3.json')))185        with open(os.path.join(account_data_dir, 'resource-groups.json'), 'r') as result_file:186            results = json.load(result_file)187            value = results['value']188            self.assertTrue(any(dic['name'] == self.resource_groups[0] for dic in value))189            self.assertTrue(any(dic['name'] == self.resource_groups[1] for dic in value))190        with open(os.path.join(account_data_dir, 'request1.json'), 'r') as result_file:191            results = json.load(result_file)192            value = results['value']193            self.assertTrue(any(dic['vmName'] == f'{self.resource_groups[0]}-vm' for dic in value))194            self.assertTrue(any(dic['vmName'] == f'{self.resource_groups[1]}-vm' for dic in value))195            self.assertTrue(any(dic['resourceGroup'] == self.resource_groups[0] for dic in value))196            self.assertTrue(any(dic['resourceGroup'] == self.resource_groups[1] for dic in value))197        with open(os.path.join(account_data_dir, 'request2.json'), 'r') as result_file:198            results = json.load(result_file)199            value = results['value']200            self.assertTrue(any(dic['vmName'] == f'{self.resource_groups[0]}-vm' for dic in value))201            self.assertTrue(any(dic['vmName'] == f'{self.resource_groups[1]}-vm' for dic in value))202            self.assertTrue(any(dic['resourceGroup'] == self.resource_groups[0] for dic in value))203            self.assertTrue(any(dic['resourceGroup'] == self.resource_groups[1] for dic in value))204        call_args = '\n'.join(str(arg) for arg in patched_logger.call_args)205        self.assertIn('some exception', call_args)206    @patch('logging.Logger.exception')207    def test_failed_request_be_written_to_failures_summary(self, unused_patched_logger):208        """209        Testing that even if a specific request failed we write the failure to the report.210        """211        # Arrange212        ### Resource Groups213        when(dragoneye.cloud_scanner.azure.azure_scanner) \214            .invoke_get_request(215            f'https://management.azure.com/subscriptions/{self.subscription_id}/resourcegroups?api-version=2020-09-01',216            self.auth, on_giveup=ANY) \217            .thenReturn(mock({'status_code': 200, 'text': self.resource_groups_text}))218        ### request1, resourceGroup1219        when(dragoneye.cloud_scanner.azure.azure_scanner) \220            .invoke_get_request(221            f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[0]}'222            f'/providers/Microsoft.Compute/virtualMachines?api-version=2020-12-01',223            self.auth, on_giveup=ANY) \224            .thenReturn(mock({'status_code': 500, 'content': b'{"error": {"code": 500, "message": "some message"}}'}))225        ### request3226        when(dragoneye.cloud_scanner.azure.azure_scanner) \227            .invoke_get_request(228            f'https://management.azure.com/subscriptions/{self.subscription_id}/providers/Microsoft.Compute/request3?api-version=2020-12-01',229            self.auth, on_giveup=ANY) \230            .thenRaise(Exception('some exception'))231        # Act232        scanner = AzureScanner(self.token, self.azure_settings)233        output_path = scanner.scan()234        account_data_dir = os.path.join(output_path, self.account_name)235        # Assert236        self.assertTrue(os.path.isfile(os.path.join(account_data_dir, 'resource-groups.json')))237        self._assert_failures_report_file(output_path, {'error': {'code': 500, 'message': 'some message'},238                                                        'request': 'https://management.azure.com/subscriptions/subscription-id'239                                                                   '/resourceGroups/resourceGroup1/providers/Microsoft.Compute/'240                                                                   'virtualMachines?api-version=2020-12-01'})241    def _assert_failures_report_file(self, result_path, failure):242        with open(os.path.join(result_path, 'failures-report.json')) as failures_file:243            failures = json.loads(failures_file.read())244            self.assertEqual(len(failures), 1)...list.py
Source:list.py  
1# -*- coding: utf-8 -*- #2# Copyright 2022 Google LLC. All Rights Reserved.3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8#    http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15"""Utils for running gcloud command and kubectl command."""16from __future__ import absolute_import17from __future__ import division18from __future__ import unicode_literals19import json20from googlecloudsdk.command_lib.anthos.config.sync.common import exceptions21from googlecloudsdk.command_lib.anthos.config.sync.common import utils22from googlecloudsdk.core import log23def ListResources(project, name, namespace, repo_cluster, membership):24  """List managed resources.25  Args:26    project: The project id the repo is from.27    name: The name of the corresponding ResourceGroup CR.28    namespace: The namespace of the corresponding ResourceGroup CR.29    repo_cluster: The cluster that the repo is synced to.30    membership: membership name that the repo should be from.31  Returns:32    List of raw ResourceGroup dicts33  """34  if membership and repo_cluster:35    raise exceptions.ConfigSyncError(36        'only one of --membership and --cluster may be specified.')37  resource_groups = []38  # Get ResourceGroups from the Config Controller cluster39  if not membership:  # exclude CC clusters if membership option is provided40    cc_rg = _GetResourceGroupsFromConfigController(41        project, name, namespace, repo_cluster)42    resource_groups.extend(cc_rg)43  # Get ResourceGroups from memberships44  member_rg = _GetResourceGroupsFromMemberships(45      project, name, namespace, repo_cluster, membership)46  resource_groups.extend(member_rg)47  # Parse ResourceGroups to structured output48  return ParseResultFromRawResourceGroups(resource_groups)49def _GetResourceGroupsFromConfigController(50    project, name, namespace, repo_cluster):51  """List all ResourceGroup CRs from Config Controller clusters.52  Args:53    project: The project id the repo is from.54    name: The name of the corresponding ResourceGroup CR.55    namespace: The namespace of the corresponding ResourceGroup CR.56    repo_cluster: The cluster that the repo is synced to.57  Returns:58    List of raw ResourceGroup dicts59  """60  clusters = []61  resource_groups = []62  try:63    # TODO(b/218518163): support resources applied by kpt live apply64    clusters = utils.ListConfigControllerClusters(project)65  except exceptions.ConfigSyncError as err:66    log.error(err)67  if clusters:68    for cluster in clusters:69      if repo_cluster and repo_cluster != cluster[0]:70        continue71      try:72        utils.KubeconfigForCluster(project, cluster[1], cluster[0])73        cc_rg = _GetResourceGroups(cluster[0], 'Config Controller', name,74                                   namespace)75        if cc_rg:76          resource_groups.extend(cc_rg)77      except exceptions.ConfigSyncError as err:78        log.error(err)79  return resource_groups80def _GetResourceGroupsFromMemberships(81    project, name, namespace, repo_cluster, membership):82  """List all ResourceGroup CRs from the provided membership cluster.83  Args:84    project: The project id the repo is from.85    name: The name of the corresponding ResourceGroup CR.86    namespace: The namespace of the corresponding ResourceGroup CR.87    repo_cluster: The cluster that the repo is synced to.88    membership: membership name that the repo should be from.89  Returns:90    List of raw ResourceGroup dicts91  """92  resource_groups = []93  try:94    memberships = utils.ListMemberships(project)95  except exceptions.ConfigSyncError as err:96    raise err97  for member in memberships:98    if membership and not utils.MembershipMatched(member, membership):99      continue100    if repo_cluster and repo_cluster != member:101      continue102    try:103      utils.KubeconfigForMembership(project, member)104      member_rg = _GetResourceGroups(member, 'Membership', name, namespace)105      if member_rg:106        resource_groups.extend(member_rg)107    except exceptions.ConfigSyncError as err:108      log.error(err)109  return resource_groups110def _GetResourceGroups(cluster_name, cluster_type, name, namespace):111  """List all the ResourceGroup CRs from the given cluster.112  Args:113    cluster_name: The membership name or cluster name of the current cluster.114    cluster_type: The type of the current cluster. It is either a Fleet-cluster115      or a Config-controller cluster.116    name: The name of the desired ResourceGroup.117    namespace: The namespace of the desired ResourceGroup.118  Returns:119    List of raw ResourceGroup dicts120  Raises:121    Error: errors that happen when listing the CRs from the cluster.122  """123  utils.GetConfigManagement(cluster_name, cluster_type)124  if not namespace:125    params = ['--all-namespaces']126  else:127    params = ['-n', namespace]128  repos, err = utils.RunKubectl(129      ['get', 'resourcegroup.kpt.dev', '-o', 'json'] + params)130  if err:131    raise exceptions.ConfigSyncError(132        'Error getting ResourceGroup custom resources for cluster {}: {}'133        .format(cluster_name, err))134  if not repos:135    return []136  obj = json.loads(repos)137  if 'items' not in obj or not obj['items']:138    return []139  resource_groups = []140  for item in obj['items']:141    _, nm = utils.GetObjectKey(item)142    if name and nm != name:143      continue144    resource_groups.append(RawResourceGroup(cluster_name, item))145  return resource_groups146class RawResourceGroup:147  """Representation of the raw ResourceGroup output from kubectl."""148  def __init__(self, cluster, rg_dict):149    """Initialize a RawResourceGroup object.150    Args:151      cluster: name of the cluster the results are from152      rg_dict: raw ResourceGroup dictionary parsed from kubectl153    """154    self.cluster = cluster155    self.rg_dict = rg_dict156class ListItem:157  """Result class to be returned to gcloud."""158  def __init__(self, cluster_name='', group='', kind='', namespace='', name='',159               status='', condition=''):160    """Initialize a ListItem object.161    Args:162      cluster_name: name of the cluster the results are from163      group: group of the resource164      kind: kind of the resource165      namespace: namespace of the resource166      name: name of the resource167      status: status of the resource168      condition: condition message of the resource169    """170    self.cluster_name = cluster_name171    self.group = group172    self.kind = kind173    self.namespace = namespace174    self.name = name175    self.status = status176    self.condition = condition177  @classmethod178  def FromResourceStatus(cls, cluster_name, resource):179    """Initialize a ListItem object from a resourceStatus.180    Args:181      cluster_name: name of the cluster the results are from182      resource: individual resource status dictionary parsed from kubectl183    Returns:184      new instance of ListItem185    """186    condition = ''187    reconcile_condition = utils.GetActuationCondition(resource)188    conditions = resource.get('conditions', [])[:]189    if reconcile_condition:190      conditions.insert(0, reconcile_condition)191    if conditions:192      delimited_msg = ', '.join(193          ["'{}'".format(c['message']) for c in conditions])194      condition = '[{}]'.format(delimited_msg)195    return cls(196        cluster_name=cluster_name,197        group=resource['group'],198        kind=resource['kind'],199        namespace=resource['namespace'],200        name=resource['name'],201        status=resource['status'],202        condition=condition,203    )204  def __eq__(self, other):205    attributes = ['cluster_name', 'group', 'kind', 'namespace', 'name',206                  'status', 'condition']207    for a in attributes:208      if getattr(self, a) != getattr(other, a):209        return False210    return True211def ParseResultFromRawResourceGroups(raw_resource_groups):212  """Parse from RawResourceGroup.213  Args:214    raw_resource_groups: List of RawResourceGroup215  Returns:216    List of ListItems217  """218  resources = []219  for raw_rg in raw_resource_groups:220    cluster = raw_rg.cluster221    resource_statuses = raw_rg.rg_dict['status'].get('resourceStatuses', [])222    for rs in resource_statuses:223      resources.append(ListItem.FromResourceStatus(cluster, rs))...manage_rg.py
Source:manage_rg.py  
1from azure.identity import AzureCliCredential2from azure.mgmt.resource import ResourceManagementClient3import os456subscription_id = os.environ["SUBSCRIPTION_ID"]7credential = AzureCliCredential()8resource_client = ResourceManagementClient(credential, subscription_id)910def list_rg():11    group_list = resource_client.resource_groups.list()12    13    column_width = 4014    print("Resource Group".ljust(column_width) + "Location")15    print("-" * (column_width * 2))16    17    for group in list(group_list):18        print(f"{group.name:<{column_width}}{group.location}")1920def create_rg(rg_name):21    resource_client.resource_groups.create_or_update(rg_name, {"location": "westeurope"})2223def get_rg(rg_name):24    print(f"Resource group with name [{rg_name}] exists: ", end="")25    print(resource_client.resource_groups.check_existence(rg_name))2627def update_rg(tag, value):28    rg_name = "aino-test-rg"29    rg_params = {'location':'westeurope'}30    rg_params.update(tags={tag:value})31    resource_client.resource_groups.create_or_update(rg_name, rg_params)3233def delete_rg(rg_name):34    delete_rg = resource_client.resource_groups.begin_delete(rg_name)35    delete_rg.wait()3637def main():38#    create_rg("aino-test-rg")39#    list_rg()40#    get_rg("aino-test-rg")41#    update_rg("tag2", "value2")42    delete_rg("aino-test-rg")43
...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
