Best Python code snippet using localstack_python
tests_int_access_control.py
Source:tests_int_access_control.py  
...19    def test_get_by_id_owner_with_read_access_returns_data(20        self, get_all_workspaces_with_read_access_by_user21    ):22        data_id = self.fixture.data_collection[fixture_data.USER_1_WORKSPACE_1].id23        mock_user = _create_user("1")24        get_all_workspaces_with_read_access_by_user.return_value = [25            fixture_data.workspace_126        ]27        data = data_api.get_by_id(data_id, mock_user)28        self.assertTrue(isinstance(data, Data))29    @patch(30        "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"31    )32    def test_get_by_id_owner_without_read_access_returns_data(33        self, get_all_workspaces_with_read_access_by_user34    ):35        data_id = self.fixture.data_collection[fixture_data.USER_1_WORKSPACE_1].id36        mock_user = _create_user("1")37        get_all_workspaces_with_read_access_by_user.return_value = []38        with self.assertRaises(AccessControlError):39            data_api.get_by_id(data_id, mock_user)40    @patch(41        "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"42    )43    def test_get_by_id_owner_without_read_access_returns_data(44        self, get_all_workspaces_with_read_access_by_user45    ):46        data_id = self.fixture.data_collection[fixture_data.USER_1_WORKSPACE_1].id47        mock_user = _create_user("1")48        get_all_workspaces_with_read_access_by_user.return_value = []49        data = data_api.get_by_id(data_id, mock_user)50        self.assertTrue(isinstance(data, Data))51    @patch(52        "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"53    )54    def test_get_by_id_user_without_read_access_raises_error(55        self, get_all_workspaces_with_read_access_by_user56    ):57        data_id = self.fixture.data_collection[fixture_data.USER_1_WORKSPACE_1].id58        mock_user = _create_user("2")59        get_all_workspaces_with_read_access_by_user.return_value = []60        with self.assertRaises(AccessControlError):61            data_api.get_by_id(data_id, mock_user)62    def test_get_by_id_owner_no_workspace_read_access_returns_data(self):63        data_id = self.fixture.data_collection[fixture_data.USER_1_NO_WORKSPACE].id64        mock_user = _create_user("1")65        data = data_api.get_by_id(data_id, mock_user)66        self.assertTrue(isinstance(data, Data))67    def test_get_by_id_not_owner_no_workspace_raises_error(self):68        data_id = self.fixture.data_collection[fixture_data.USER_1_NO_WORKSPACE].id69        mock_user = _create_user("2")70        with self.assertRaises(AccessControlError):71            data_api.get_by_id(data_id, mock_user)72class TestDataGetAll(MongoIntegrationBaseTestCase):73    fixture = fixture_data74    def test_get_all_as_superuser_returns_all_data(self):75        mock_user = _create_user("1", is_superuser=True)76        data_list = data_api.get_all(mock_user)77        self.assertTrue(len(data_list) == len(self.fixture.data_collection))78    def test_get_all_as_user_raises_error(self):79        mock_user = _create_user("1")80        with self.assertRaises(AccessControlError):81            data_api.get_all(mock_user)82class TestDataGetAllByUser(MongoIntegrationBaseTestCase):83    fixture = fixture_data84    @patch(85        "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"86    )87    def test_get_all_returns_data(self, get_all_workspaces_with_read_access_by_user):88        mock_user = _create_user("1")89        get_all_workspaces_with_read_access_by_user.return_value = []90        with self.assertRaises(AccessControlError):91            data_api.get_all_except_user(mock_user)92    @patch(93        "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"94    )95    def test_get_all_by_user_returns_owned_data(96        self, get_all_workspaces_with_read_access_by_user97    ):98        mock_user = _create_user("1")99        data_list = data_api.get_all_by_user(mock_user)100        get_all_workspaces_with_read_access_by_user.return_value = []101        self.assertTrue(len(data_list) == 3)102        self.assertTrue(data.id == "1" for data in data_list)103    @patch(104        "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"105    )106    def test_get_all_by_user_returns_no_data_if_owns_zero(107        self, get_all_workspaces_with_read_access_by_user108    ):109        mock_user = _create_user("3")110        data_list = data_api.get_all_by_user(mock_user)111        get_all_workspaces_with_read_access_by_user.return_value = []112        self.assertTrue(len(data_list) == 0)113    def test_get_all_by_user_as_superuser_returns_own_data(self):114        mock_user = _create_user("1", is_superuser=True)115        data_list = data_api.get_all_by_user(mock_user)116        self.assertTrue(len(data_list) == 3)117        self.assertTrue(data.user_id == "1" for data in data_list)118class TestDataGetAllExceptUser(MongoIntegrationBaseTestCase):119    # NOTE: Will always fail when private data are present (data.workspace=None, data.user_id!=user.id)120    fixture = fixture_data121    @patch(122        "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"123    )124    def test_get_all_except_user_raises_error_if_no_workspace_access(125        self, get_all_workspaces_with_read_access_by_user126    ):127        mock_user = _create_user("1")128        get_all_workspaces_with_read_access_by_user.return_value = []129        with self.assertRaises(AccessControlError):130            data_api.get_all_except_user(mock_user)131    @patch(132        "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"133    )134    def test_get_all_except_user_raises_error_data_if_workspace_access(135        self, get_all_workspaces_with_read_access_by_user136    ):137        mock_user = _create_user("1")138        get_all_workspaces_with_read_access_by_user.return_value = [139            fixture_data.workspace_1,140            fixture_data.workspace_2,141        ]142        with self.assertRaises(AccessControlError):143            data_api.get_all_except_user(mock_user)144    def test_get_all_except_user_as_superuser_returns_others_data(self):145        mock_user = _create_user("1", is_superuser=True)146        data_list = data_api.get_all_except_user(mock_user)147        self.assertTrue(len(data_list) > 0)148        self.assertTrue(data.user_id != mock_user.id for data in data_list)149class TestDataUpsert(MongoIntegrationBaseTestCase):150    # TODO: can not test without mock for GridFS151    pass152class TestDataExecuteQuery(MongoIntegrationBaseTestCase):153    fixture = fixture_data154    @patch(155        "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"156    )157    def test_execute_query_returns_data(158        self, get_all_workspaces_with_read_access_by_user159    ):160        mock_user = _create_user("3")161        get_all_workspaces_with_read_access_by_user.return_value = [162            fixture_data.workspace_1163        ]164        data_list = data_api.execute_query({}, mock_user)165        self.assertTrue(len(data_list) > 0)166        self.assertTrue(all(isinstance(data, Data) for data in data_list))167    @patch(168        "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"169    )170    def test_execute_query_returns_data_in_workspace_1(171        self, get_all_workspaces_with_read_access_by_user172    ):173        mock_user = _create_user("3")174        get_all_workspaces_with_read_access_by_user.return_value = [175            fixture_data.workspace_1176        ]177        data_list = data_api.execute_query({}, mock_user)178        self.assertTrue(len(data_list) == 2)179        self.assertTrue(data.workspace == "1" for data in data_list)180    @patch(181        "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"182    )183    def test_execute_query_returns_data_in_workspace_2(184        self, get_all_workspaces_with_read_access_by_user185    ):186        mock_user = _create_user("3")187        get_all_workspaces_with_read_access_by_user.return_value = [188            fixture_data.workspace_1189        ]190        data_list = data_api.execute_query({}, mock_user)191        self.assertTrue(len(data_list) == 2)192        self.assertTrue(data.workspace == "2" for data in data_list)193    @patch(194        "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"195    )196    def test_execute_query_returns_data_in_workspace_1_and_2(197        self, get_all_workspaces_with_read_access_by_user198    ):199        mock_user = _create_user("3")200        get_all_workspaces_with_read_access_by_user.return_value = [201            fixture_data.workspace_1,202            fixture_data.workspace_2,203        ]204        data_list = data_api.execute_query({}, mock_user)205        self.assertTrue(len(data_list) == 3)206        self.assertTrue(207            data.workspace == "1" or data.workspace == "2" for data in data_list208        )209    @patch(210        "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"211    )212    def test_execute_query_force_workspace_1_returns_data_from_workspace_1(213        self, get_all_workspaces_with_read_access_by_user214    ):215        mock_user = _create_user("3")216        get_all_workspaces_with_read_access_by_user.return_value = [217            fixture_data.workspace_1218        ]219        data_list = data_api.execute_query(220            {"workspace": fixture_data.workspace_1.id}, mock_user221        )222        self.assertTrue(len(data_list) == 2)223        self.assertTrue(data.workspace == "1" for data in data_list)224    @patch(225        "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"226    )227    def test_execute_query_force_workspace_1_does_not_return_data_if_no_access(228        self, get_all_workspaces_with_read_access_by_user229    ):230        mock_user = _create_user("3")231        get_all_workspaces_with_read_access_by_user.return_value = []232        data_list = data_api.execute_query(233            {"workspace": fixture_data.workspace_1.id}, mock_user234        )235        self.assertTrue(len(data_list) == 0)236    @patch(237        "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"238    )239    def test_execute_query_force_workspace_none_does_not_return_data_if_no_access(240        self, get_all_workspaces_with_read_access_by_user241    ):242        mock_user = _create_user("3")243        get_all_workspaces_with_read_access_by_user.return_value = []244        data_list = data_api.execute_query({"workspace": None}, mock_user)245        self.assertTrue(len(data_list) == 0)246    def test_execute_query_as_superuser_returns_all_data(self):247        mock_user = _create_user("1", is_superuser=True)248        data_list = data_api.execute_query({}, mock_user)249        self.assertTrue(len(data_list) == 5)250class TestDataDelete(MongoIntegrationBaseTestCase):251    fixture = fixture_data252    @unittest.skip("GridFS not supported by mongomock")253    @patch(254        "core_main_app.components.workspace.api.get_all_workspaces_with_write_access_by_user"255    )256    def test_delete_own_data_in_accessible_workspace_deletes_data(257        self, get_all_workspaces_with_write_access_by_user258    ):259        mock_user = _create_user("1")260        get_all_workspaces_with_write_access_by_user.return_value = [261            fixture_data.workspace_1262        ]263        data_api.delete(264            fixture_data.data_collection[fixture_data.USER_1_WORKSPACE_1], mock_user265        )266    @unittest.skip("GridFS not supported by mongomock")267    @patch(268        "core_main_app.components.workspace.api.get_all_workspaces_with_write_access_by_user"269    )270    def test_delete_own_data_in_not_accessible_workspace_deletes_data(271        self, get_all_workspaces_with_write_access_by_user272    ):273        mock_user = _create_user("1")274        get_all_workspaces_with_write_access_by_user.return_value = []275        data_api.delete(276            fixture_data.data_collection[fixture_data.USER_1_WORKSPACE_1], mock_user277        )278    @unittest.skip("GridFS not supported by mongomock")279    @patch(280        "core_main_app.components.workspace.api.get_all_workspaces_with_write_access_by_user"281    )282    def test_delete_others_data_in_accessible_workspace_deletes_data(283        self, get_all_workspaces_with_write_access_by_user284    ):285        mock_user = _create_user("1")286        get_all_workspaces_with_write_access_by_user.return_value = [287            fixture_data.workspace_2288        ]289        data_api.delete(290            fixture_data.data_collection[fixture_data.USER_2_WORKSPACE_2], mock_user291        )292    @patch(293        "core_main_app.components.workspace.api.get_all_workspaces_with_write_access_by_user"294    )295    def test_delete_others_data_not_accessible_workspace_raises_error(296        self, get_all_workspaces_with_write_access_by_user297    ):298        mock_user = _create_user("1")299        get_all_workspaces_with_write_access_by_user.return_value = [300            fixture_data.workspace_1301        ]302        with self.assertRaises(AccessControlError):303            data_api.delete(304                fixture_data.data_collection[fixture_data.USER_2_WORKSPACE_2], mock_user305            )306    @unittest.skip("GridFS not supported by mongomock")307    @patch(308        "core_main_app.components.workspace.api.get_all_workspaces_with_write_access_by_user"309    )310    def test_delete_own_data_not_in_workspace_deletes_data(311        self, get_all_workspaces_with_write_access_by_user312    ):313        mock_user = _create_user("1")314        get_all_workspaces_with_write_access_by_user.return_value = []315        with self.assertRaises(AccessControlError):316            data_api.delete(317                fixture_data.data_collection[fixture_data.USER_1_WORKSPACE_1], mock_user318            )319    @patch(320        "core_main_app.components.workspace.api.get_all_workspaces_with_write_access_by_user"321    )322    def test_delete_others_data_not_in_workspace_raises_error(323        self, get_all_workspaces_with_write_access_by_user324    ):325        mock_user = _create_user("1")326        get_all_workspaces_with_write_access_by_user.return_value = []327        with self.assertRaises(AccessControlError):328            data_api.delete(329                fixture_data.data_collection[fixture_data.USER_2_NO_WORKSPACE],330                mock_user,331            )332class TestDataChangeOwner(MongoIntegrationBaseTestCase):333    fixture = fixture_data334    def test_change_owner_from_owner_to_owner_ok(self):335        mock_owner = _create_user("1")336        data_api.change_owner(337            document=fixture_data.data_collection[fixture_data.USER_1_NO_WORKSPACE],338            new_user=mock_owner,339            user=mock_owner,340        )341    def test_change_owner_from_owner_to_user_ok(self):342        mock_owner = _create_user("1")343        mock_user = _create_user("2")344        data_api.change_owner(345            document=fixture_data.data_collection[fixture_data.USER_1_NO_WORKSPACE],346            new_user=mock_user,347            user=mock_owner,348        )349    def test_change_owner_from_user_to_user_raises_exception(self):350        mock_owner = _create_user("1")351        mock_user = _create_user("2")352        with self.assertRaises(AccessControlError):353            data_api.change_owner(354                document=fixture_data.data_collection[fixture_data.USER_1_NO_WORKSPACE],355                new_user=mock_owner,356                user=mock_user,357            )358    def test_change_owner_as_superuser_ok(self):359        mock_user = _create_user("2", is_superuser=True)360        data_api.change_owner(361            document=fixture_data.data_collection[fixture_data.USER_1_NO_WORKSPACE],362            new_user=mock_user,363            user=mock_user,364        )365def _create_user(user_id, is_superuser=False):...Export_SDDC_config.py
Source:Export_SDDC_config.py  
1### Package Imports ####2import requests3import json4import argparse5### Ready arguments from command line ###6parser = argparse.ArgumentParser(description='Export user created NSX-T Firewall rules and objects for a given VMC SDDC.')7parser.add_argument('orgid')8parser.add_argument('sddcid')9parser.add_argument('refreshtoken')10args = parser.parse_args()11### Access Token ###12authurl = 'https://console.cloud.vmware.com/csp/gateway/am/api/auth/api-tokens/authorize?refresh_token=%s' %(args.refreshtoken)13headers = {'Accept': 'application/json'}14payload = {}15authresp = requests.post(authurl,headers=headers,data=payload)16authjson = json.loads(authresp.text)17token = authjson["access_token"]18### Get ReverseProxy URL ###19infourl = 'https://vmc.vmware.com/vmc/api/orgs/%s/sddcs/%s' %(args.orgid,args.sddcid)20headers = {'csp-auth-token': token, 'content-type': 'application/json'}21payload = {}22sddcresp = requests.get(infourl,headers=headers,data=payload)23sddcjson = json.loads(sddcresp.text)24srevproxyurl = sddcjson["resource_config"]["nsx_api_public_endpoint_url"]25curCursor = ''26pageSize = 100027### Source SDDC URL's ###28smgwgroupsurl = '%s/policy/api/v1/infra/domains/mgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)29scgwgroupsurl = '%s/policy/api/v1/infra/domains/cgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)30scgwurl = '%s/policy/api/v1/infra/domains/cgw/gateway-policies/default/rules' %(srevproxyurl)31smgwurl = '%s/policy/api/v1/infra/domains/mgw/gateway-policies/default/rules' %(srevproxyurl)32sservicesurl = '%s/policy/api/v1/infra/services?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)33sdfwurl = '%s/policy/api/v1/infra/domains/cgw/communication-maps' %(srevproxyurl)34ikeprofurl = '%s/policy/api/v1/infra/ipsec-vpn-ike-profiles' %(srevproxyurl)35tunnelprofurl = '%s/policy/api/v1/infra/ipsec-vpn-tunnel-profiles' %(srevproxyurl)36bgpneighborurl =  '%s/policy/api/v1/infra/tier-0s/vmc/locale-services/default/bgp/neighbors' %(srevproxyurl)37l3vpnsessionurl = '%s/policy/api/v1/infra/tier-0s/vmc/locale-services/default/ipsec-vpn-services/default/sessions' %(srevproxyurl)38l2vpnsessionurl = '%s/policy/api/v1/infra/tier-0s/vmc/locale-services/default/l2vpn-services/default/sessions' %(srevproxyurl)39headers = {'csp-auth-token': token, 'content-type': 'application/json'}40sfwDump = open("sourceRules.json", "a+")41### Get Source MGW Groups ###42print("{\"MGWGroups\": [")43mgroupsresp = requests.get(smgwgroupsurl,headers=headers)44mg = json.loads(mgroupsresp.text)45mgroups = mg["results"]46if mg["result_count"] > pageSize:47    curCursor = mg["cursor"]48    smgwgroupsurl = '%s/policy/api/v1/infra/domains/mgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)49    while "cursor" in mg:50        mgroupsresp = requests.get(smgwgroupsurl,headers=headers)51        mg = json.loads(mgroupsresp.text)52        mgroups = mg["results"]53        if "cursor" in mg:54            curCursor = mg["cursor"]55        smgwgroupsurl = '%s/policy/api/v1/infra/domains/mgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)56        ### Filter out system groups ###57        index = 058        count = 059        for group in mgroups:60            if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":61                count = count + 162        for group in mgroups:63            if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":64                #print("Index..... Item Value: "+repr(index),mgroups[index])65                print(json.dumps(group,indent=4),end = "")66                if(count-1 > index):67                    print(",")68                index = index + 169index = 070count = 071for group in mgroups:72    if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":73	    count = count + 174for group in mgroups:75            if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":76                #print("Index..... Item Value: "+repr(index),mgroups[index])77                print(json.dumps(group,indent=4),end="")78                if(count-1 > index):79                    print(",")80                index = index + 181print("],")82### Get Source CGW Groups ###83cgroupsresp = requests.get(scgwgroupsurl,headers=headers)84cg = json.loads(cgroupsresp.text)85cgroups = cg["results"]86### Filter out system groups ###87print("\"CGWGroups\": [")88if cg["result_count"] > pageSize:89    curCursor = cg["cursor"]90    scgwgroupsurl = '%s/policy/api/v1/infra/domains/cgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)91    while "cursor" in cg:92        cgroupsresp = requests.get(scgwgroupsurl,headers=headers)93        cg = json.loads(cgroupsresp.text)94        cgroups = cg["results"]95        if "cursor" in cg:96            curCursor = cg["cursor"]97        scgwgroupsurl = '%s/policy/api/v1/infra/domains/cgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)98        ### Filter out system groups ###99        index = 0100        count = 0101        for group in cgroups:102            if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":103                count = count + 1104        for group in cgroups:105            if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":106                print(json.dumps(group,indent=4),end="")107                if(count-1 > index):108                    print(",")109                index = index + 1110index = 0111count = 0112for group in cgroups:113    if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":114	    count = count + 1115for group in cgroups:116    if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":117        print(json.dumps(group,indent=4),end="")118        if(count-1 > index):119            print(",")120        index = index + 1121print("],")122### Get Source SDDC Firewall Services ###123servicesresp = requests.get(sservicesurl,headers=headers)124srv = json.loads(servicesresp.text)125services = srv["results"]126user_service_count = 0127### Filter out system Services ###128for service in services:129    if service["_create_user"]!= "admin" and service["_create_user"]!="admin;admin" and service["_create_user"]!="system":130        user_service_count = user_service_count + 1131#print("Result Count for Services: "+repr(user_service_count))132if(user_service_count > 0):133    print("\"Services\": [")134if srv["result_count"] > pageSize:135    curCursor = srv["cursor"]136    sservicesurl = '%s/policy/api/v1/infra/services?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)137    while "cursor" in srv:138        servicesresp = requests.get(sservicesurl,headers=headers)139        srv = json.loads(servicesresp.text)140        services = srv["results"]141        if "cursor" in srv:142            curCursor = srv["cursor"]143        sservicesurl = '%s/policy/api/v1/infra/services?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)144        ### Filter out system services ###145        index = 0146        count = 0147        for service in services:148            if service["_create_user"]!= "admin" and service["_create_user"]!="admin;admin" and service["_create_user"]!="system":149                count = count + 1150        for service in services:151            if service["_create_user"]!= "admin" and service["_create_user"]!="admin;admin" and service["_create_user"]!="system":152                print(json.dumps(service,indent=4),end="")153                if(count-1 > index):154                    print(",")155                index = index + 1156index = 0157count = 0158for service in services:159    if service["_create_user"]!= "admin" and service["_create_user"]!="admin;admin" and service["_create_user"]!="system":160	    count = count + 1				161for service in services:162    if service["_create_user"]!= "admin" and service["_create_user"]!="admin;admin" and service["_create_user"]!="system":163        print(json.dumps(service,indent=4))164        if(count-1 > index):165            print(",")166        index = index + 1167if(user_service_count > 0):168    print("],")169### Get Management Gateway Firewall Rules ###170mgwresponse = requests.get(smgwurl,headers=headers)171m = json.loads(mgwresponse.text)172mgwrules = m["results"]173### Filter out system Rules ###174curCursor = ''175print("\"MGWRules\": [")176index = 0177count = 0178for rule in mgwrules:179    if rule["_create_user"]!= "admin" and rule["_create_user"]!="admin;admin" and rule["_create_user"]!="system":180	    count = count + 1181for rule in mgwrules:182    if rule["_create_user"]!= "admin" and rule["_create_user"]!="admin;admin" and rule["_create_user"]!="system":183        print(json.dumps(rule,indent=4),end="")184        if(count-1 > index):185            print(",")186        index = index + 1187print("],")188### Get Compute Gateway Firewall Rules ###189cgwresponse = requests.get(scgwurl,headers=headers)190c = json.loads(cgwresponse.text)191cgwrules = c["results"]192### Filter out system Rules ###193print("\"CGWRules\": [")194index = 0195count = 0196for rule in cgwrules:197    if rule["_create_user"]!= "admin" and rule["_create_user"]!="admin;admin" and rule["_create_user"]!="system":198	    count = count + 1199for rule in cgwrules:200    if rule["_create_user"]!= "admin" and rule["_create_user"]!="admin;admin" and rule["_create_user"]!="system":201        print(json.dumps(rule,indent=4),end="")202        if(count-1 > index):203            print(",")204        index = index + 1205print("],")206### Get Source Distributed Firewall Rules ###207#print("DFW URL: "+sdfwurl)208dfwresponse = requests.get(sdfwurl,headers=headers)209d = json.loads(dfwresponse.text)210#print('DFW Comms Map: '+str(d))211cmaps = d["results"]212print("\"DFWRules\": [")213index = 0214count = 0215for cmap in cmaps:216    requrl = "%s/%s" %(sdfwurl,cmap["id"])217    cmapDetails = requests.get(requrl,headers=headers)218    count = count + 1219for cmap in cmaps:220    requrl = "%s/%s" %(sdfwurl,cmap["id"])221    cmapDetails = requests.get(requrl,headers=headers)222    cmapd = json.loads(cmapDetails.text)223    print(cmapDetails.text,end="")224    if(count-1 > index):225        print(",")226    index = index + 1227print("],")228    229### Get VPN IKE Profiles ###230ikeprofresponse = requests.get(ikeprofurl,headers=headers)231ikep = json.loads(ikeprofresponse.text)232ikeprofiles = ikep["results"]233### Filter out system profiles ###234curCursor = ''235print("\"IKEProfiles\": [")236index = 0237count = 0238for ikeprofile in ikeprofiles:239    if ikeprofile["_create_user"]!= "admin" and ikeprofile["_create_user"]!="admin;admin" and ikeprofile["_create_user"]!="system":240	    count = count + 1241for ikeprofile in ikeprofiles:242    if ikeprofile["_create_user"]!= "admin" and ikeprofile["_create_user"]!="admin;admin" and ikeprofile["_create_user"]!="system":243        print(json.dumps(ikeprofile,indent=4),end="")244        if(count-1 > index):245            print(",")246    index = index + 1247print("],")248		249### Get VPN Tunnel Profiles ###250tunprofresponse = requests.get(tunnelprofurl,headers=headers)251tunp = json.loads(tunprofresponse.text)252tunprofiles = tunp["results"]253### Filter out system profiles ###254curCursor = ''255print("\"TunnelProfiles\": [")256index = 0257count = 0258for tunprofile in tunprofiles:259    if tunprofile["_create_user"]!= "admin" and tunprofile["_create_user"]!="admin;admin" and tunprofile["_create_user"]!="system":260	    count = count + 1261for tunprofile in tunprofiles:262    if tunprofile["_create_user"]!= "admin" and tunprofile["_create_user"]!="admin;admin" and tunprofile["_create_user"]!="system":263        print(json.dumps(tunprofile,indent=4),end="")264        if(count-1 > index):265            print(",")266    index = index + 1267print("],")268### Get BGP Neighbors for Route Based VPN's  ###269bgpnresponse = requests.get(bgpneighborurl,headers=headers)270bgn = json.loads(bgpnresponse.text)271bgpns = bgn["results"]272### Filter out system BGP Neighbors ###273curCursor = ''274print("\"BGPNeighbors\": [")275index = 0276count = 0277for bgpn in bgpns:278    if bgpn["_create_user"]!= "admin" and bgpn["_create_user"]!="admin;admin" and bgpn["_create_user"]!="system":279	    count = count + 1280for bgpn in bgpns:281    if bgpn["_create_user"]!= "admin" and bgpn["_create_user"]!="admin;admin" and bgpn["_create_user"]!="system":282        print(json.dumps(bgpn,indent=4),end="")283        if(count-1 > index):284            print(",")285    index = index + 1286print("],")287### Get L2VPN Sessions ###288l2vpnsresponse = requests.get(l2vpnsessionurl,headers=headers)289l2v = json.loads(l2vpnsresponse.text)290l2vpns = l2v["results"]291### Filter out system profiles ###292curCursor = ''293print("\"L2VPNSessions\": [")294index = 0295count = 0296for l2vpn in l2vpns:297    if l2vpn["_create_user"]!= "admin" and l2vpn["_create_user"]!="admin;admin" and l2vpn["_create_user"]!="system":298	    count = count + 1299for l2vpn in l2vpns:300    if l2vpn["_create_user"]!= "admin" and l2vpn["_create_user"]!="admin;admin" and l2vpn["_create_user"]!="system":301        print(json.dumps(l2vpn,indent=4),end="")302        if(count-1 > index):303            print(",")304    index = index + 1305print("],")306### Get L3VPN Sessions ###307l3vpnsresponse = requests.get(l3vpnsessionurl,headers=headers)308l3v = json.loads(l3vpnsresponse.text)309l3vpns = l3v["results"]310### Filter out system profiles ###311curCursor = ''312print("\"L3VPNSessions\": [")313index = 0314count = 0315for l3vpn in l3vpns:316    if l3vpn["_create_user"]!= "admin" and l3vpn["_create_user"]!="admin;admin" and l3vpn["_create_user"]!="system":317	    count = count + 1318for l3vpn in l3vpns:319    if l3vpn["_create_user"]!= "admin" and l3vpn["_create_user"]!="admin;admin" and l3vpn["_create_user"]!="system":320        session_id = l3vpn["id"]321        l3vpnsessionpskurl = '%s/policy/api/v1/infra/tier-0s/vmc/locale-services/default/ipsec-vpn-services/default/sessions/%s?action=show_sensitive_data' %(srevproxyurl,session_id)322        l3vpnpskresponse = requests.get(l3vpnsessionpskurl,headers=headers)323        l3vpsk = json.loads(l3vpnpskresponse.text)324        l3vpn["psk"] = l3vpsk["psk"]325        print(json.dumps(l3vpn,indent=4),end="")326        if(count-1 > index):327            print(",")328    index = index + 1...Export_NSX-T_FW_config_from_an_SDDC.py
Source:Export_NSX-T_FW_config_from_an_SDDC.py  
1### Package Imports ####2import requests3import json4import argparse5### Ready arguments from command line ###6parser = argparse.ArgumentParser(description='Export user created NSX-T Firewall rules and objects for a given VMC SDDC.')7parser.add_argument('orgid')8parser.add_argument('sddcid')9parser.add_argument('refreshtoken')10args = parser.parse_args()11### Access Token ###12authurl = 'https://console.cloud.vmware.com/csp/gateway/am/api/auth/api-tokens/authorize?refresh_token=%s' %(args.refreshtoken)13headers = {'Accept': 'application/json'}14payload = {}15authresp = requests.post(authurl,headers=headers,data=payload)16authjson = json.loads(authresp.text)17token = authjson["access_token"]18### Get ReverseProxy URL ###19infourl = 'https://vmc.vmware.com/vmc/api/orgs/%s/sddcs/%s' %(args.orgid,args.sddcid)20headers = {'csp-auth-token': token, 'content-type': 'application/json'}21payload = {}22sddcresp = requests.get(infourl,headers=headers,data=payload)23sddcjson = json.loads(sddcresp.text)24srevproxyurl = sddcjson["resource_config"]["nsx_api_public_endpoint_url"]25curCursor = ''26pageSize = 100027### Source SDDC URL's ###28smgwgroupsurl = '%s/policy/api/v1/infra/domains/mgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)29scgwgroupsurl = '%s/policy/api/v1/infra/domains/cgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)30scgwurl = '%s/policy/api/v1/infra/domains/cgw/gateway-policies/default/rules' %(srevproxyurl)31smgwurl = '%s/policy/api/v1/infra/domains/mgw/gateway-policies/default/rules' %(srevproxyurl)32sservicesurl = '%s/policy/api/v1/infra/services?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)33sdfwurl = '%s/policy/api/v1/infra/domains/cgw/communication-maps' %(srevproxyurl)34ikeprofurl = '%s/policy/api/v1/infra/ipsec-vpn-ike-profiles' %(srevproxyurl)35tunnelprofurl = '%s/policy/api/v1/infra/ipsec-vpn-tunnel-profiles' %(srevproxyurl)36bgpneighborurl =  '%s/policy/api/v1/infra/tier-0s/vmc/locale-services/default/bgp/neighbors' %(srevproxyurl)37l3vpnsessionurl = '%s/policy/api/v1/infra/tier-0s/vmc/locale-services/default/ipsec-vpn-services/default/sessions' %(srevproxyurl)38headers = {'csp-auth-token': token, 'content-type': 'application/json'}39sfwDump = open("sourceRules.json", "a+")40### Get Source MGW Groups ###41print("MGW Groups")42mgroupsresp = requests.get(smgwgroupsurl,headers=headers)43mg = json.loads(mgroupsresp.text)44mgroups = mg["results"]45if mg["result_count"] > pageSize:46    curCursor = mg["cursor"]47    smgwgroupsurl = '%s/policy/api/v1/infra/domains/mgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)48    while "cursor" in mg:49        mgroupsresp = requests.get(smgwgroupsurl,headers=headers)50        mg = json.loads(mgroupsresp.text)51        mgroups = mg["results"]52        if "cursor" in mg:53            curCursor = mg["cursor"]54        smgwgroupsurl = '%s/policy/api/v1/infra/domains/mgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)55        ### Filter out system groups ###56        for group in mgroups:57            if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":58                print(json.dumps(group,indent=4))59for group in mgroups:60            if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":61                print(json.dumps(group,indent=4))62### Get Source CGW Groups ###63cgroupsresp = requests.get(scgwgroupsurl,headers=headers)64cg = json.loads(cgroupsresp.text)65cgroups = cg["results"]66### Filter out system groups ###67print("CGW Groups")68if cg["result_count"] > pageSize:69    curCursor = cg["cursor"]70    scgwgroupsurl = '%s/policy/api/v1/infra/domains/cgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)71    while "cursor" in cg:72        cgroupsresp = requests.get(scgwgroupsurl,headers=headers)73        cg = json.loads(cgroupsresp.text)74        cgroups = cg["results"]75        if "cursor" in cg:76            curCursor = cg["cursor"]77        scgwgroupsurl = '%s/policy/api/v1/infra/domains/cgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)78        ### Filter out system groups ###79        for group in cgroups:80            if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":81                print(json.dumps(group,indent=4))82for group in cgroups:83    if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":84        print(json.dumps(group,indent=4))85### Get Source SDDC Firewall Services ###86servicesresp = requests.get(sservicesurl,headers=headers)87srv = json.loads(servicesresp.text)88services = srv["results"]89### Filter out system Services ###90print("Services")91if srv["result_count"] > pageSize:92    curCursor = srv["cursor"]93    sservicesurl = '%s/policy/api/v1/infra/services?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)94    while "cursor" in srv:95        servicesresp = requests.get(sservicesurl,headers=headers)96        srv = json.loads(servicesresp.text)97        services = srv["results"]98        if "cursor" in srv:99            curCursor = srv["cursor"]100        sservicesurl = '%s/policy/api/v1/infra/services?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)101        ### Filter out system services ###102        for service in services:103            if service["_create_user"]!= "admin" and service["_create_user"]!="admin;admin" and service["_create_user"]!="system":104                print(json.dumps(service,indent=4))105for service in services:106    if service["_create_user"]!= "admin" and service["_create_user"]!="admin;admin" and service["_create_user"]!="system":107        print(json.dumps(service,indent=4))108### Get Management Gateway Firewall Rules ###109mgwresponse = requests.get(smgwurl,headers=headers)110m = json.loads(mgwresponse.text)111mgwrules = m["results"]112### Filter out system Rules ###113curCursor = ''114print("MGW Rules")115for rule in mgwrules:116    if rule["_create_user"]!= "admin" and rule["_create_user"]!="admin;admin" and rule["_create_user"]!="system":117        print(json.dumps(rule,indent=4))118### Get Compute Gateway Firewall Rules ###119cgwresponse = requests.get(scgwurl,headers=headers)120c = json.loads(cgwresponse.text)121cgwrules = c["results"]122### Filter out system Rules ###123print("CGW Rules")124for rule in cgwrules:125    if rule["_create_user"]!= "admin" and rule["_create_user"]!="admin;admin" and rule["_create_user"]!="system":126        print(json.dumps(rule,indent=4))127### Get Source Distributed Firewall Rules ###128dfwresponse = requests.get(sdfwurl,headers=headers)129d = json.loads(dfwresponse.text)130#print('DFW Comms Map: ')131cmaps = d["results"]132print('Distributed Firewall Rules: ')133for cmap in cmaps:134    requrl = "%s/%s" %(sdfwurl,cmap["id"])135    cmapDetails = requests.get(requrl,headers=headers)136    cmapd = json.loads(cmapDetails.text)137    print(cmapDetails.text)138    139### Get VPN IKE Profiles ###140ikeprofresponse = requests.get(ikeprofurl,headers=headers)141ikep = json.loads(ikeprofresponse.text)142ikeprofiles = ikep["results"]143### Filter out system profiles ###144curCursor = ''145print("IKE Profiles")146for ikeprofile in ikeprofiles:147    if ikeprofile["_create_user"]!= "admin" and ikeprofile["_create_user"]!="admin;admin" and ikeprofile["_create_user"]!="system":148        print(json.dumps(ikeprofile,indent=4))149		150### Get VPN Tunnel Profiles ###151tunprofresponse = requests.get(tunnelprofurl,headers=headers)152tunp = json.loads(tunprofresponse.text)153tunprofiles = tunp["results"]154### Filter out system profiles ###155curCursor = ''156print("Tunnel Profiles")157for tunprofile in tunprofiles:158    if tunprofile["_create_user"]!= "admin" and tunprofile["_create_user"]!="admin;admin" and tunprofile["_create_user"]!="system":159        print(json.dumps(tunprofile,indent=4))160### Get BGP Neighbors for Route Based VPN's  ###161bgpnresponse = requests.get(bgpneighborurl,headers=headers)162bgn = json.loads(bgpnresponse.text)163bgpns = bgn["results"]164### Filter out system BGP Neighbors ###165curCursor = ''166print("BGP Neighbors:")167for bgpn in bgpns:168    if bgpn["_create_user"]!= "admin" and bgpn["_create_user"]!="admin;admin" and bgpn["_create_user"]!="system":169        print(json.dumps(bgpn,indent=4))170### Get L3VPN Sessions ###171l3vpnsresponse = requests.get(l3vpnsessionurl,headers=headers)172l3v = json.loads(l3vpnsresponse.text)173l3vpns = l3v["results"]174### Filter out system profiles ###175curCursor = ''176print("L3VPN Sessions:")177for l3vpn in l3vpns:178    if l3vpn["_create_user"]!= "admin" and l3vpn["_create_user"]!="admin;admin" and l3vpn["_create_user"]!="system":...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
