How to use resource_groups method in localstack

Best Python code snippet using localstack_python

test_azure_scanner.py

Source:test_azure_scanner.py Github

copy

Full Screen

1import json2import os3import tempfile4import unittest5from unittest.mock import patch, ANY6from dragoneye.cloud_scanner.azure.azure_scanner import AzureScanner, AzureCloudScanSettings7from mockito import when, unstub, mock8import dragoneye9class TestAzureScanner(unittest.TestCase):10 @classmethod11 def setUpClass(cls) -> None:12 cls.token = 'token'13 cls.auth = {'Authorization': cls.token}14 cls.subscription_id = 'subscription-id'15 cls.account_name = 'test-account'16 cls.resource_groups = ['resourceGroup1', 'resourceGroup2']17 cls.resource_groups_text = json.dumps({"value": [{"name": cls.resource_groups[0]}, {"name": cls.resource_groups[1]}]})18 def setUp(self) -> None:19 self.temp_dir = tempfile.TemporaryDirectory()20 self.azure_settings = AzureCloudScanSettings(21 commands_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), 'resources', 'azure_scan_commands.yaml'),22 subscription_id=self.subscription_id,23 account_name=self.account_name,24 should_clean_before_scan=True,25 output_path=self.temp_dir.name26 )27 when(dragoneye.cloud_scanner.azure.azure_scanner).invoke_get_request(ANY, ANY, on_giveup=ANY).\28 thenReturn(mock({'status_code': 200, 'text': '{}'}))29 def tearDown(self) -> None:30 self.temp_dir.cleanup()31 unstub()32 def test_scan_ok_with_values(self):33 # Arrange34 rg_id = "/subscriptions/{}/resourceGroups/{}"35 ### Resource Groups36 when(dragoneye.cloud_scanner.azure.azure_scanner)\37 .invoke_get_request(38 f'https://management.azure.com/subscriptions/{self.subscription_id}/resourcegroups?api-version=2020-09-01',39 self.auth, on_giveup=ANY) \40 .thenReturn(mock({'status_code': 200, 'text': self.resource_groups_text}))41 ### request1, resourceGroup142 when(dragoneye.cloud_scanner.azure.azure_scanner)\43 .invoke_get_request(44 f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[0]}/providers/Microsoft.Compute/virtualMachines?api-version=2020-12-01',45 self.auth, on_giveup=ANY)\46 .thenReturn(mock({'status_code': 200, 'text': json.dumps({"value": [{"id": rg_id.format(self.subscription_id, self.resource_groups[0]), "vmName": f'{self.resource_groups[0]}-vm'}]})}))47 ### request1, resourceGroup248 when(dragoneye.cloud_scanner.azure.azure_scanner) \49 .invoke_get_request(50 f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[1]}/providers/Microsoft.Compute/virtualMachines?api-version=2020-12-01',51 self.auth, on_giveup=ANY) \52 .thenReturn(mock({'status_code': 200, 'text': json.dumps({"value": [{"id": rg_id.format(self.subscription_id, self.resource_groups[1]), "vmName": f'{self.resource_groups[1]}-vm'}]})}))53 ### request2, vm154 when(dragoneye.cloud_scanner.azure.azure_scanner)\55 .invoke_get_request(56 f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[0]}/providers/Microsoft.Compute/virtualMachines/{f"{self.resource_groups[0]}-vm"}?api-version=2020-12-01',57 self.auth, on_giveup=ANY)\58 .thenReturn(mock({'status_code': 200, 'text': json.dumps({"value": [{"id": rg_id.format(self.subscription_id, self.resource_groups[0]), "vmName": f'{self.resource_groups[0]}-vm'}]})}))59 ### request2, vm260 when(dragoneye.cloud_scanner.azure.azure_scanner)\61 .invoke_get_request(62 f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[1]}/providers/Microsoft.Compute/virtualMachines/{f"{self.resource_groups[1]}-vm"}?api-version=2020-12-01',63 self.auth, on_giveup=ANY)\64 .thenReturn(mock({'status_code': 200, 'text': json.dumps({"value": [{"id": rg_id.format(self.subscription_id, self.resource_groups[1]), "vmName": f'{self.resource_groups[1]}-vm'}]})}))65 # Act66 scanner = AzureScanner(self.token, self.azure_settings)67 output_path = scanner.scan()68 account_data_dir = os.path.join(output_path, self.account_name)69 # Assert70 self.assertTrue(os.path.isfile(os.path.join(account_data_dir, 'resource-groups.json')))71 self.assertTrue(os.path.isfile(os.path.join(account_data_dir, 'request1.json')))72 self.assertTrue(os.path.isfile(os.path.join(account_data_dir, 'request2.json')))73 with open(os.path.join(account_data_dir, 'resource-groups.json'), 'r') as result_file:74 results = json.load(result_file)75 value = results['value']76 self.assertTrue(any(dic['name'] == self.resource_groups[0] for dic in value))77 self.assertTrue(any(dic['name'] == self.resource_groups[1] for dic in value))78 with open(os.path.join(account_data_dir, 'request1.json'), 'r') as result_file:79 results = json.load(result_file)80 value = results['value']81 self.assertTrue(any(dic['vmName'] == f'{self.resource_groups[0]}-vm' for dic in value))82 self.assertTrue(any(dic['vmName'] == f'{self.resource_groups[1]}-vm' for dic in value))83 self.assertTrue(any(dic['resourceGroup'] == self.resource_groups[0] for dic in value))84 self.assertTrue(any(dic['resourceGroup'] == self.resource_groups[1] for dic in value))85 with open(os.path.join(account_data_dir, 'request2.json'), 'r') as result_file:86 results = json.load(result_file)87 value = results['value']88 self.assertTrue(any(dic['vmName'] == f'{self.resource_groups[0]}-vm' for dic in value))89 self.assertTrue(any(dic['vmName'] == f'{self.resource_groups[1]}-vm' for dic in value))90 self.assertTrue(any(dic['resourceGroup'] == self.resource_groups[0] for dic in value))91 self.assertTrue(any(dic['resourceGroup'] == self.resource_groups[1] for dic in value))92 def test_scan_ok_no_resources(self):93 # Arrange94 ### Resource Groups95 when(dragoneye.cloud_scanner.azure.azure_scanner)\96 .invoke_get_request(97 f'https://management.azure.com/subscriptions/{self.subscription_id}/resourcegroups?api-version=2020-09-01',98 self.auth, on_giveup=ANY) \99 .thenReturn(mock({'status_code': 200, 'text': self.resource_groups_text}))100 ### request1, resourceGroup1101 when(dragoneye.cloud_scanner.azure.azure_scanner)\102 .invoke_get_request(103 f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[0]}/providers/Microsoft.Compute/virtualMachines?api-version=2020-12-01',104 self.auth, on_giveup=ANY)\105 .thenReturn(mock({'status_code': 200, 'text': json.dumps({"value": []})}))106 ### request1, resourceGroup2107 when(dragoneye.cloud_scanner.azure.azure_scanner) \108 .invoke_get_request(109 f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[1]}/providers/Microsoft.Compute/virtualMachines?api-version=2020-12-01',110 self.auth, on_giveup=ANY) \111 .thenReturn(mock({'status_code': 200, 'text': json.dumps({"value": []})}))112 # Act113 scanner = AzureScanner(self.token, self.azure_settings)114 output_path = scanner.scan()115 account_data_dir = os.path.join(output_path, self.account_name)116 # Assert117 self.assertTrue(os.path.isfile(os.path.join(account_data_dir, 'resource-groups.json')))118 self.assertTrue(os.path.isfile(os.path.join(account_data_dir, 'request1.json')))119 self.assertTrue(os.path.isfile(os.path.join(account_data_dir, 'request2.json')))120 with open(os.path.join(account_data_dir, 'resource-groups.json'), 'r') as result_file:121 results = json.load(result_file)122 value = results['value']123 self.assertTrue(any(dic['name'] == self.resource_groups[0] for dic in value))124 self.assertTrue(any(dic['name'] == self.resource_groups[1] for dic in value))125 with open(os.path.join(account_data_dir, 'request1.json'), 'r') as result_file:126 results = json.load(result_file)127 value = results['value']128 self.assertListEqual(value, list())129 with open(os.path.join(account_data_dir, 'request2.json'), 'r') as result_file:130 results = json.load(result_file)131 value = results['value']132 self.assertListEqual(value, list())133 @patch('logging.Logger.exception')134 def test_scan_failed_request(self, patched_logger):135 """136 Testing that even if a specific request raises an exception, we continue to scan and do not crash.137 """138 # Arrange139 rg_id = "/subscriptions/{}/resourceGroups/{}"140 ### Resource Groups141 when(dragoneye.cloud_scanner.azure.azure_scanner)\142 .invoke_get_request(143 f'https://management.azure.com/subscriptions/{self.subscription_id}/resourcegroups?api-version=2020-09-01',144 self.auth, on_giveup=ANY) \145 .thenReturn(mock({'status_code': 200, 'text': self.resource_groups_text}))146 ### request1, resourceGroup1147 when(dragoneye.cloud_scanner.azure.azure_scanner)\148 .invoke_get_request(149 f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[0]}/providers/Microsoft.Compute/virtualMachines?api-version=2020-12-01',150 self.auth, on_giveup=ANY)\151 .thenReturn(mock({'status_code': 200, 'text': json.dumps({"value": [{"id": rg_id.format(self.subscription_id, self.resource_groups[0]), "vmName": f'{self.resource_groups[0]}-vm'}]})}))152 ### request1, resourceGroup2153 when(dragoneye.cloud_scanner.azure.azure_scanner) \154 .invoke_get_request(155 f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[1]}/providers/Microsoft.Compute/virtualMachines?api-version=2020-12-01',156 self.auth, on_giveup=ANY) \157 .thenReturn(mock({'status_code': 200, 'text': json.dumps({"value": [{"id": rg_id.format(self.subscription_id, self.resource_groups[1]), "vmName": f'{self.resource_groups[1]}-vm'}]})}))158 ### request2, vm1159 when(dragoneye.cloud_scanner.azure.azure_scanner)\160 .invoke_get_request(161 f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[0]}/providers/Microsoft.Compute/virtualMachines/{f"{self.resource_groups[0]}-vm"}?api-version=2020-12-01',162 self.auth, on_giveup=ANY)\163 .thenReturn(mock({'status_code': 200, 'text': json.dumps({"value": [{"id": rg_id.format(self.subscription_id, self.resource_groups[0]), "vmName": f'{self.resource_groups[0]}-vm'}]})}))164 ### request2, vm2165 when(dragoneye.cloud_scanner.azure.azure_scanner)\166 .invoke_get_request(167 f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[1]}/providers/Microsoft.Compute/virtualMachines/{f"{self.resource_groups[1]}-vm"}?api-version=2020-12-01',168 self.auth, on_giveup=ANY)\169 .thenReturn(mock({'status_code': 200, 'text': json.dumps({"value": [{"id": rg_id.format(self.subscription_id, self.resource_groups[1]), "vmName": f'{self.resource_groups[1]}-vm'}]})}))170 ### request3171 when(dragoneye.cloud_scanner.azure.azure_scanner)\172 .invoke_get_request(173 f'https://management.azure.com/subscriptions/{self.subscription_id}/providers/Microsoft.Compute/request3?api-version=2020-12-01',174 self.auth, on_giveup=ANY)\175 .thenRaise(Exception('some exception'))176 # Act177 scanner = AzureScanner(self.token, self.azure_settings)178 output_path = scanner.scan()179 account_data_dir = os.path.join(output_path, self.account_name)180 # Assert181 self.assertTrue(os.path.isfile(os.path.join(account_data_dir, 'resource-groups.json')))182 self.assertTrue(os.path.isfile(os.path.join(account_data_dir, 'request1.json')))183 self.assertTrue(os.path.isfile(os.path.join(account_data_dir, 'request2.json')))184 self.assertFalse(os.path.isfile(os.path.join(account_data_dir, 'request3.json')))185 with open(os.path.join(account_data_dir, 'resource-groups.json'), 'r') as result_file:186 results = json.load(result_file)187 value = results['value']188 self.assertTrue(any(dic['name'] == self.resource_groups[0] for dic in value))189 self.assertTrue(any(dic['name'] == self.resource_groups[1] for dic in value))190 with open(os.path.join(account_data_dir, 'request1.json'), 'r') as result_file:191 results = json.load(result_file)192 value = results['value']193 self.assertTrue(any(dic['vmName'] == f'{self.resource_groups[0]}-vm' for dic in value))194 self.assertTrue(any(dic['vmName'] == f'{self.resource_groups[1]}-vm' for dic in value))195 self.assertTrue(any(dic['resourceGroup'] == self.resource_groups[0] for dic in value))196 self.assertTrue(any(dic['resourceGroup'] == self.resource_groups[1] for dic in value))197 with open(os.path.join(account_data_dir, 'request2.json'), 'r') as result_file:198 results = json.load(result_file)199 value = results['value']200 self.assertTrue(any(dic['vmName'] == f'{self.resource_groups[0]}-vm' for dic in value))201 self.assertTrue(any(dic['vmName'] == f'{self.resource_groups[1]}-vm' for dic in value))202 self.assertTrue(any(dic['resourceGroup'] == self.resource_groups[0] for dic in value))203 self.assertTrue(any(dic['resourceGroup'] == self.resource_groups[1] for dic in value))204 call_args = '\n'.join(str(arg) for arg in patched_logger.call_args)205 self.assertIn('some exception', call_args)206 @patch('logging.Logger.exception')207 def test_failed_request_be_written_to_failures_summary(self, unused_patched_logger):208 """209 Testing that even if a specific request failed we write the failure to the report.210 """211 # Arrange212 ### Resource Groups213 when(dragoneye.cloud_scanner.azure.azure_scanner) \214 .invoke_get_request(215 f'https://management.azure.com/subscriptions/{self.subscription_id}/resourcegroups?api-version=2020-09-01',216 self.auth, on_giveup=ANY) \217 .thenReturn(mock({'status_code': 200, 'text': self.resource_groups_text}))218 ### request1, resourceGroup1219 when(dragoneye.cloud_scanner.azure.azure_scanner) \220 .invoke_get_request(221 f'https://management.azure.com/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_groups[0]}'222 f'/providers/Microsoft.Compute/virtualMachines?api-version=2020-12-01',223 self.auth, on_giveup=ANY) \224 .thenReturn(mock({'status_code': 500, 'content': b'{"error": {"code": 500, "message": "some message"}}'}))225 ### request3226 when(dragoneye.cloud_scanner.azure.azure_scanner) \227 .invoke_get_request(228 f'https://management.azure.com/subscriptions/{self.subscription_id}/providers/Microsoft.Compute/request3?api-version=2020-12-01',229 self.auth, on_giveup=ANY) \230 .thenRaise(Exception('some exception'))231 # Act232 scanner = AzureScanner(self.token, self.azure_settings)233 output_path = scanner.scan()234 account_data_dir = os.path.join(output_path, self.account_name)235 # Assert236 self.assertTrue(os.path.isfile(os.path.join(account_data_dir, 'resource-groups.json')))237 self._assert_failures_report_file(output_path, {'error': {'code': 500, 'message': 'some message'},238 'request': 'https://management.azure.com/subscriptions/subscription-id'239 '/resourceGroups/resourceGroup1/providers/Microsoft.Compute/'240 'virtualMachines?api-version=2020-12-01'})241 def _assert_failures_report_file(self, result_path, failure):242 with open(os.path.join(result_path, 'failures-report.json')) as failures_file:243 failures = json.loads(failures_file.read())244 self.assertEqual(len(failures), 1)...

Full Screen

Full Screen

list.py

Source:list.py Github

copy

Full Screen

1# -*- coding: utf-8 -*- #2# Copyright 2022 Google LLC. All Rights Reserved.3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15"""Utils for running gcloud command and kubectl command."""16from __future__ import absolute_import17from __future__ import division18from __future__ import unicode_literals19import json20from googlecloudsdk.command_lib.anthos.config.sync.common import exceptions21from googlecloudsdk.command_lib.anthos.config.sync.common import utils22from googlecloudsdk.core import log23def ListResources(project, name, namespace, repo_cluster, membership):24 """List managed resources.25 Args:26 project: The project id the repo is from.27 name: The name of the corresponding ResourceGroup CR.28 namespace: The namespace of the corresponding ResourceGroup CR.29 repo_cluster: The cluster that the repo is synced to.30 membership: membership name that the repo should be from.31 Returns:32 List of raw ResourceGroup dicts33 """34 if membership and repo_cluster:35 raise exceptions.ConfigSyncError(36 'only one of --membership and --cluster may be specified.')37 resource_groups = []38 # Get ResourceGroups from the Config Controller cluster39 if not membership: # exclude CC clusters if membership option is provided40 cc_rg = _GetResourceGroupsFromConfigController(41 project, name, namespace, repo_cluster)42 resource_groups.extend(cc_rg)43 # Get ResourceGroups from memberships44 member_rg = _GetResourceGroupsFromMemberships(45 project, name, namespace, repo_cluster, membership)46 resource_groups.extend(member_rg)47 # Parse ResourceGroups to structured output48 return ParseResultFromRawResourceGroups(resource_groups)49def _GetResourceGroupsFromConfigController(50 project, name, namespace, repo_cluster):51 """List all ResourceGroup CRs from Config Controller clusters.52 Args:53 project: The project id the repo is from.54 name: The name of the corresponding ResourceGroup CR.55 namespace: The namespace of the corresponding ResourceGroup CR.56 repo_cluster: The cluster that the repo is synced to.57 Returns:58 List of raw ResourceGroup dicts59 """60 clusters = []61 resource_groups = []62 try:63 # TODO(b/218518163): support resources applied by kpt live apply64 clusters = utils.ListConfigControllerClusters(project)65 except exceptions.ConfigSyncError as err:66 log.error(err)67 if clusters:68 for cluster in clusters:69 if repo_cluster and repo_cluster != cluster[0]:70 continue71 try:72 utils.KubeconfigForCluster(project, cluster[1], cluster[0])73 cc_rg = _GetResourceGroups(cluster[0], 'Config Controller', name,74 namespace)75 if cc_rg:76 resource_groups.extend(cc_rg)77 except exceptions.ConfigSyncError as err:78 log.error(err)79 return resource_groups80def _GetResourceGroupsFromMemberships(81 project, name, namespace, repo_cluster, membership):82 """List all ResourceGroup CRs from the provided membership cluster.83 Args:84 project: The project id the repo is from.85 name: The name of the corresponding ResourceGroup CR.86 namespace: The namespace of the corresponding ResourceGroup CR.87 repo_cluster: The cluster that the repo is synced to.88 membership: membership name that the repo should be from.89 Returns:90 List of raw ResourceGroup dicts91 """92 resource_groups = []93 try:94 memberships = utils.ListMemberships(project)95 except exceptions.ConfigSyncError as err:96 raise err97 for member in memberships:98 if membership and not utils.MembershipMatched(member, membership):99 continue100 if repo_cluster and repo_cluster != member:101 continue102 try:103 utils.KubeconfigForMembership(project, member)104 member_rg = _GetResourceGroups(member, 'Membership', name, namespace)105 if member_rg:106 resource_groups.extend(member_rg)107 except exceptions.ConfigSyncError as err:108 log.error(err)109 return resource_groups110def _GetResourceGroups(cluster_name, cluster_type, name, namespace):111 """List all the ResourceGroup CRs from the given cluster.112 Args:113 cluster_name: The membership name or cluster name of the current cluster.114 cluster_type: The type of the current cluster. It is either a Fleet-cluster115 or a Config-controller cluster.116 name: The name of the desired ResourceGroup.117 namespace: The namespace of the desired ResourceGroup.118 Returns:119 List of raw ResourceGroup dicts120 Raises:121 Error: errors that happen when listing the CRs from the cluster.122 """123 utils.GetConfigManagement(cluster_name, cluster_type)124 if not namespace:125 params = ['--all-namespaces']126 else:127 params = ['-n', namespace]128 repos, err = utils.RunKubectl(129 ['get', 'resourcegroup.kpt.dev', '-o', 'json'] + params)130 if err:131 raise exceptions.ConfigSyncError(132 'Error getting ResourceGroup custom resources for cluster {}: {}'133 .format(cluster_name, err))134 if not repos:135 return []136 obj = json.loads(repos)137 if 'items' not in obj or not obj['items']:138 return []139 resource_groups = []140 for item in obj['items']:141 _, nm = utils.GetObjectKey(item)142 if name and nm != name:143 continue144 resource_groups.append(RawResourceGroup(cluster_name, item))145 return resource_groups146class RawResourceGroup:147 """Representation of the raw ResourceGroup output from kubectl."""148 def __init__(self, cluster, rg_dict):149 """Initialize a RawResourceGroup object.150 Args:151 cluster: name of the cluster the results are from152 rg_dict: raw ResourceGroup dictionary parsed from kubectl153 """154 self.cluster = cluster155 self.rg_dict = rg_dict156class ListItem:157 """Result class to be returned to gcloud."""158 def __init__(self, cluster_name='', group='', kind='', namespace='', name='',159 status='', condition=''):160 """Initialize a ListItem object.161 Args:162 cluster_name: name of the cluster the results are from163 group: group of the resource164 kind: kind of the resource165 namespace: namespace of the resource166 name: name of the resource167 status: status of the resource168 condition: condition message of the resource169 """170 self.cluster_name = cluster_name171 self.group = group172 self.kind = kind173 self.namespace = namespace174 self.name = name175 self.status = status176 self.condition = condition177 @classmethod178 def FromResourceStatus(cls, cluster_name, resource):179 """Initialize a ListItem object from a resourceStatus.180 Args:181 cluster_name: name of the cluster the results are from182 resource: individual resource status dictionary parsed from kubectl183 Returns:184 new instance of ListItem185 """186 condition = ''187 reconcile_condition = utils.GetActuationCondition(resource)188 conditions = resource.get('conditions', [])[:]189 if reconcile_condition:190 conditions.insert(0, reconcile_condition)191 if conditions:192 delimited_msg = ', '.join(193 ["'{}'".format(c['message']) for c in conditions])194 condition = '[{}]'.format(delimited_msg)195 return cls(196 cluster_name=cluster_name,197 group=resource['group'],198 kind=resource['kind'],199 namespace=resource['namespace'],200 name=resource['name'],201 status=resource['status'],202 condition=condition,203 )204 def __eq__(self, other):205 attributes = ['cluster_name', 'group', 'kind', 'namespace', 'name',206 'status', 'condition']207 for a in attributes:208 if getattr(self, a) != getattr(other, a):209 return False210 return True211def ParseResultFromRawResourceGroups(raw_resource_groups):212 """Parse from RawResourceGroup.213 Args:214 raw_resource_groups: List of RawResourceGroup215 Returns:216 List of ListItems217 """218 resources = []219 for raw_rg in raw_resource_groups:220 cluster = raw_rg.cluster221 resource_statuses = raw_rg.rg_dict['status'].get('resourceStatuses', [])222 for rs in resource_statuses:223 resources.append(ListItem.FromResourceStatus(cluster, rs))...

Full Screen

Full Screen

manage_rg.py

Source:manage_rg.py Github

copy

Full Screen

1from azure.identity import AzureCliCredential2from azure.mgmt.resource import ResourceManagementClient3import os456subscription_id = os.environ["SUBSCRIPTION_ID"]7credential = AzureCliCredential()8resource_client = ResourceManagementClient(credential, subscription_id)910def list_rg():11 group_list = resource_client.resource_groups.list()12 13 column_width = 4014 print("Resource Group".ljust(column_width) + "Location")15 print("-" * (column_width * 2))16 17 for group in list(group_list):18 print(f"{group.name:<{column_width}}{group.location}")1920def create_rg(rg_name):21 resource_client.resource_groups.create_or_update(rg_name, {"location": "westeurope"})2223def get_rg(rg_name):24 print(f"Resource group with name [{rg_name}] exists: ", end="")25 print(resource_client.resource_groups.check_existence(rg_name))2627def update_rg(tag, value):28 rg_name = "aino-test-rg"29 rg_params = {'location':'westeurope'}30 rg_params.update(tags={tag:value})31 resource_client.resource_groups.create_or_update(rg_name, rg_params)3233def delete_rg(rg_name):34 delete_rg = resource_client.resource_groups.begin_delete(rg_name)35 delete_rg.wait()3637def main():38# create_rg("aino-test-rg")39# list_rg()40# get_rg("aino-test-rg")41# update_rg("tag2", "value2")42 delete_rg("aino-test-rg")43 ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful