How to use tenant_id method in tempest

Best Python code snippet using tempest_python

test_l2networkApi.py

Source:test_l2networkApi.py Github

copy

Full Screen

1# vim: tabstop=4 shiftwidth=4 softtabstop=42#3# Copyright 2011 Cisco Systems, Inc. All rights reserved.4#5# Licensed under the Apache License, Version 2.0 (the "License"); you may6# not use this file except in compliance with the License. You may obtain7# a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the14# License for the specific language governing permissions and limitations15# under the License.16#17# @author: Shweta Padubidri, Cisco Systems, Inc.18#19import logging20import unittest21from quantum.common import exceptions as exc22from quantum.plugins.cisco.common import cisco_constants as const23from quantum.plugins.cisco.common import cisco_exceptions as cexc24from quantum.plugins.cisco import l2network_plugin25from quantum.plugins.cisco import l2network_plugin_configuration as conf26from quantum.plugins.cisco.db import api as db27from quantum.plugins.cisco.db import l2network_db as cdb28LOG = logging.getLogger('quantum.tests.test_core_api_func')29class CoreAPITestFunc(unittest.TestCase):30 def test_create_network(self, net_tenant_id=None, net_name=None):31 """32 Tests creation of new Virtual Network.33 """34 LOG.debug("test_create_network - START")35 if net_tenant_id:36 tenant_id = net_tenant_id37 else:38 tenant_id = self.tenant_id39 if net_name:40 network_name = net_name41 else:42 network_name = self.network_name43 new_net_dict = self._l2network_plugin.create_network(44 tenant_id, network_name)45 net = db.network_get(new_net_dict[const.NET_ID])46 self.assertEqual(net[const.NETWORKNAME], network_name)47 self.assertEqual(new_net_dict[const.NET_NAME], network_name)48 self.tearDownNetwork(tenant_id, new_net_dict[const.NET_ID])49 LOG.debug("test_create_network - END")50 def test_delete_network(self, net_tenant_id=None):51 """52 Tests deletion of a Virtual Network.53 """54 LOG.debug("test_delete_network - START")55 if net_tenant_id:56 tenant_id = net_tenant_id57 else:58 tenant_id = self.tenant_id59 new_net_dict = self._l2network_plugin.create_network(60 tenant_id, self.network_name)61 delete_net_dict = self._l2network_plugin.delete_network(62 tenant_id, new_net_dict[const.NET_ID])63 self.assertRaises(exc.NetworkNotFound, db.network_get,64 new_net_dict[const.NET_ID])65 self.assertEqual(66 new_net_dict[const.NET_ID], delete_net_dict[const.NET_ID])67 LOG.debug("test_delete_network - END")68 def test_delete_networkDNE(self, net_tenant_id=None, net_id='0005'):69 """70 Tests deletion of a Virtual Network when Network does not exist.71 """72 LOG.debug("test_delete_network_not_found - START")73 if net_tenant_id:74 tenant_id = net_tenant_id75 else:76 tenant_id = self.tenant_id77 self.assertRaises(78 exc.NetworkNotFound, self._l2network_plugin.delete_network,79 tenant_id, net_id)80 LOG.debug("test_delete_network_not_found - END")81 def test_delete_networkInUse(82 self, tenant_id='test_tenant', instance_tenant_id='nova',83 nova_user_id='novaadmin', instance_id=10,84 vif_id='fe701ddf-26a2-42ea-b9e6-7313d1c522cc'):85 """86 Tests deletion of a Virtual Network when Network is in Use.87 """88 LOG.debug("test_delete_networkInUse - START")89 new_net_dict = self._l2network_plugin.create_network(90 tenant_id, self.network_name)91 port_dict = self._l2network_plugin.create_port(92 tenant_id, new_net_dict[const.NET_ID], self.state)93 instance_desc = {'project_id': tenant_id,94 'user_id': nova_user_id}95 host_list = self._l2network_plugin.schedule_host(instance_tenant_id,96 instance_id,97 instance_desc)98 instance_vif_desc = {'project_id': tenant_id,99 'user_id': nova_user_id,100 'vif_id': vif_id}101 vif_description = self._l2network_plugin.associate_port(102 instance_tenant_id, instance_id,103 instance_vif_desc)104 self.assertRaises(exc.NetworkInUse,105 self._l2network_plugin.delete_network, tenant_id,106 new_net_dict[const.NET_ID])107 self.tearDownNetworkPortInterface(108 tenant_id, instance_tenant_id, instance_id, instance_vif_desc,109 new_net_dict[const.NET_ID], port_dict[const.PORT_ID])110 LOG.debug("test_delete_networkInUse - END")111 def test_show_network(self, net_tenant_id=None):112 """113 Tests display of details of a Virtual Network .114 """115 LOG.debug("test_show_network - START")116 if net_tenant_id:117 tenant_id = net_tenant_id118 else:119 tenant_id = self.tenant_id120 new_net_dict = self._l2network_plugin.create_network(121 tenant_id, self.network_name)122 result_net_dict = self._l2network_plugin.get_network_details(123 tenant_id, new_net_dict[const.NET_ID])124 net = db.network_get(new_net_dict[const.NET_ID])125 self.assertEqual(net[const.UUID], new_net_dict[const.NET_ID])126 self.assertEqual(127 new_net_dict[const.NET_ID], result_net_dict[const.NET_ID])128 self.tearDownNetwork(tenant_id, new_net_dict[const.NET_ID])129 LOG.debug("test_show_network - END")130 def test_show_networkDNE(self, net_tenant_id=None, net_id='0005'):131 """132 Tests display of a Virtual Network when Network does not exist.133 """134 LOG.debug("test_show_network_not_found - START")135 if net_tenant_id:136 tenant_id = net_tenant_id137 else:138 tenant_id = self.tenant_id139 self.assertRaises(exc.NetworkNotFound,140 self._l2network_plugin.get_network_details,141 tenant_id, net_id)142 LOG.debug("test_show_network_not_found - END")143 def test_update_network(self, net_tenant_id=None,144 new_name='new_test_network'):145 """146 Tests rename of a Virtual Network .147 """148 LOG.debug("test_update_network - START")149 if net_tenant_id:150 tenant_id = net_tenant_id151 else:152 tenant_id = self.tenant_id153 new_net_dict = self._l2network_plugin.create_network(154 tenant_id, self.network_name)155 rename_net_dict = self._l2network_plugin.update_network(156 tenant_id, new_net_dict[const.NET_ID],157 name=new_name)158 net = db.network_get(new_net_dict[const.NET_ID])159 self.assertEqual(net[const.NETWORKNAME], new_name)160 self.assertEqual(new_name, rename_net_dict[const.NET_NAME])161 self.tearDownNetwork(tenant_id, new_net_dict[const.NET_ID])162 LOG.debug("test_update_network - END")163 def test_update_networkDNE(self, net_tenant_id=None,164 net_id='0005', new_name='new_test_network'):165 """166 Tests update of a Virtual Network when Network does not exist.167 """168 LOG.debug("test_update_network_not_found - START")169 if net_tenant_id:170 tenant_id = net_tenant_id171 else:172 tenant_id = self.tenant_id173 self.assertRaises(exc.NetworkNotFound,174 self._l2network_plugin.update_network,175 tenant_id, net_id, name=new_name)176 LOG.debug("test_update_network_not_found - END")177 def test_list_networks(self, tenant_id='test_network'):178 """179 Tests listing of all the Virtual Networks .180 """181 LOG.debug("test_list_networks - START")182 new_net_dict = self._l2network_plugin.create_network(183 tenant_id, self.network_name)184 new_net_dict2 = self._l2network_plugin.create_network(185 tenant_id, 'test_net2')186 net_list = self._l2network_plugin.get_all_networks(tenant_id)187 net_temp_list = [new_net_dict, new_net_dict2]188 networks_list = db.network_list(tenant_id)189 new_networks_list = []190 for network in networks_list:191 new_network_dict = self._make_net_dict(network[const.UUID],192 network[const.NETWORKNAME],193 [])194 new_networks_list.append(new_network_dict)195 self.assertEqual(len(net_list), 2)196 self.assertTrue(net_list[0] in net_temp_list)197 self.assertTrue(net_list[1] in net_temp_list)198 self.assertTrue(new_networks_list[0] in net_temp_list)199 self.assertTrue(new_networks_list[1] in net_temp_list)200 self.tearDownNetwork(tenant_id, new_net_dict[const.NET_ID])201 self.tearDownNetwork(tenant_id, new_net_dict2[const.NET_ID])202 LOG.debug("test_list_networks - END")203 def test_list_ports(self, tenant_id='test_network'):204 """205 Tests listing of all the Ports.206 """207 LOG.debug("test_list_ports - START")208 new_net_dict = self._l2network_plugin.create_network(209 tenant_id, self.network_name)210 port_dict = self._l2network_plugin.create_port(211 tenant_id, new_net_dict[const.NET_ID],212 self.state)213 port_dict2 = self._l2network_plugin.create_port(214 tenant_id, new_net_dict[const.NET_ID],215 self.state)216 port_list = self._l2network_plugin.get_all_ports(217 tenant_id, new_net_dict[const.NET_ID])218 port_temp_list = [port_dict, port_dict2]219 network = db.network_get(new_net_dict[const.NET_ID])220 ports_list = network[const.NETWORKPORTS]221 ports_on_net = []222 for port in ports_list:223 new_port = self._make_port_dict(port[const.UUID],224 port[const.PORTSTATE],225 port[const.NETWORKID],226 port[const.INTERFACEID])227 ports_on_net.append(new_port)228 self.assertEqual(len(port_list), 2)229 self.assertTrue(port_list[0] in port_temp_list)230 self.assertTrue(port_list[1] in port_temp_list)231 self.assertTrue(ports_on_net[0] in port_temp_list)232 self.assertTrue(ports_on_net[1] in port_temp_list)233 self.tearDownPortOnly(tenant_id, new_net_dict[const.NET_ID],234 port_dict[const.PORT_ID])235 self.tearDownNetworkPort(tenant_id, new_net_dict[const.NET_ID],236 port_dict2[const.PORT_ID])237 LOG.debug("test_list_ports - END")238 def test_create_port(self, tenant_id='test_network',239 state=const.PORT_UP):240 """241 Tests creation of Ports.242 """243 LOG.debug("test_create_port - START")244 new_net_dict = self._l2network_plugin.create_network(245 tenant_id, self.network_name)246 port_dict = self._l2network_plugin.create_port(247 tenant_id, new_net_dict[const.NET_ID], state)248 port = db.port_get(new_net_dict[const.NET_ID],249 port_dict[const.PORT_ID])250 self.assertEqual(port_dict[const.PORT_STATE], state)251 self.assertEqual(port_dict[const.NET_ID], new_net_dict[const.NET_ID])252 self.assertEqual(port[const.PORTSTATE], state)253 self.assertEqual(port[const.NETWORKID], new_net_dict[const.NET_ID])254 self.tearDownNetworkPort(tenant_id, new_net_dict[const.NET_ID],255 port_dict[const.PORT_ID])256 LOG.debug("test_create_port - END")257 def test_create_port_network_DNE(258 self, net_tenant_id=None, net_id='0005', state=const.PORT_UP):259 """260 Tests creation of Ports when network does not exist.261 """262 LOG.debug("test_create_port_network_DNE - START")263 if net_tenant_id:264 tenant_id = net_tenant_id265 else:266 tenant_id = self.tenant_id267 self.assertRaises(exc.NetworkNotFound,268 self._l2network_plugin.create_port,269 tenant_id, net_id, state)270 LOG.debug("test_create_port_network_DNE - END:")271 def test_delete_port(self, tenant_id='test_tenant',272 state=const.PORT_UP):273 """274 Tests deletion of Ports275 """276 LOG.debug("test_delete_port - START")277 new_net_dict = self._l2network_plugin.create_network(278 tenant_id, self.network_name)279 port_dict = self._l2network_plugin.create_port(280 tenant_id, new_net_dict[const.NET_ID],281 state)282 delete_port_dict = self._l2network_plugin.delete_port(283 tenant_id, new_net_dict[const.NET_ID],284 port_dict[const.PORT_ID])285 self.assertRaises(exc.PortNotFound, db.port_get,286 new_net_dict[const.NET_ID], port_dict[const.PORT_ID])287 self.tearDownNetwork(tenant_id, new_net_dict[const.NET_ID])288 self.assertEqual(delete_port_dict[const.PORT_ID],289 port_dict[const.PORT_ID])290 LOG.debug("test_delete_port - END")291 def test_delete_port_networkDNE(self, tenant_id='test_tenant',292 net_id='0005', port_id='p0005'):293 """294 Tests deletion of Ports when network does not exist.295 """296 LOG.debug("test_delete_port_networkDNE - START")297 self.assertRaises(exc.NetworkNotFound,298 self._l2network_plugin.delete_port, tenant_id,299 net_id, port_id)300 LOG.debug("test_delete_port_networkDNE - END")301 def test_delete_portDNE(self, tenant_id='test_tenant', port_id='p0005'):302 """303 Tests deletion of Ports when port does not exist.304 """305 LOG.debug("test_delete_portDNE - START")306 new_net_dict = self._l2network_plugin.create_network(307 tenant_id, self.network_name)308 self.assertRaises(exc.PortNotFound, self._l2network_plugin.delete_port,309 tenant_id, new_net_dict[const.NET_ID], port_id)310 self.tearDownNetwork(tenant_id, new_net_dict[const.NET_ID])311 LOG.debug("test_delete_portDNE - END")312 def test_delete_portInUse(313 self, tenant_id='test_tenant', instance_tenant_id='nova',314 nova_user_id='novaadmin', instance_id=10,315 vif_id='fe701ddf-26a2-42ea-b9e6-7313d1c522cc'):316 """317 Tests deletion of Ports when port is in Use.318 """319 LOG.debug("test_delete_portInUse - START")320 new_net_dict = self._l2network_plugin.create_network(321 tenant_id, self.network_name)322 port_dict = self._l2network_plugin.create_port(323 tenant_id, new_net_dict[const.NET_ID],324 self.state)325 instance_desc = {'project_id': tenant_id,326 'user_id': nova_user_id}327 host_list = self._l2network_plugin.schedule_host(instance_tenant_id,328 instance_id,329 instance_desc)330 instance_vif_desc = {'project_id': tenant_id,331 'user_id': nova_user_id,332 'vif_id': vif_id}333 vif_description = self._l2network_plugin.associate_port(334 instance_tenant_id, instance_id,335 instance_vif_desc)336 self.assertRaises(exc.PortInUse,337 self._l2network_plugin.delete_port, tenant_id,338 new_net_dict[const.NET_ID], port_dict[const.PORT_ID])339 self.tearDownNetworkPortInterface(340 tenant_id, instance_tenant_id, instance_id, instance_vif_desc,341 new_net_dict[const.NET_ID], port_dict[const.PORT_ID])342 LOG.debug("test_delete_portInUse - END")343 def test_update_port(self, tenant_id='test_tenant',344 state=const.PORT_DOWN):345 """346 Tests updation of Ports.347 """348 LOG.debug("test_update_port - START")349 new_net_dict = self._l2network_plugin.create_network(350 tenant_id, self.network_name)351 port_dict = self._l2network_plugin.create_port(352 tenant_id, new_net_dict[const.NET_ID],353 self.state)354 update_port_dict = self._l2network_plugin.update_port(355 tenant_id, new_net_dict[const.NET_ID],356 port_dict[const.PORT_ID], state=state)357 new_port = db.port_get(new_net_dict[const.NET_ID],358 port_dict[const.PORT_ID])359 self.assertEqual(new_port[const.PORTSTATE], state)360 self.assertEqual(update_port_dict[const.PORT_STATE], state)361 self.tearDownNetworkPort(tenant_id, new_net_dict[const.NET_ID],362 port_dict[const.PORT_ID])363 LOG.debug("test_update_port - END")364 def test_update_port_networkDNE(self, tenant_id='test_tenant',365 net_id='0005', port_id='p0005'):366 """367 Tests updation of Ports when network does not exist.368 """369 LOG.debug("test_update_port_networkDNE - START")370 self.assertRaises(exc.NetworkNotFound,371 self._l2network_plugin.update_port, tenant_id,372 net_id, port_id, state=const.PORT_UP)373 LOG.debug("test_update_port_networkDNE - END")374 def test_update_portDNE(self, tenant_id='test_tenant', port_id='p0005'):375 """376 Tests updation of Ports when port does not exist.377 """378 LOG.debug("test_update_portDNE - START")379 new_net_dict = self._l2network_plugin.create_network(380 tenant_id, self.network_name)381 self.assertRaises(382 exc.PortNotFound, self._l2network_plugin.update_port, tenant_id,383 new_net_dict[const.NET_ID], port_id, state=const.PORT_UP)384 self.tearDownNetwork(tenant_id, new_net_dict[const.NET_ID])385 LOG.debug("test_update_portDNE - END")386 def test_show_port(self, tenant_id='test_tenant'):387 """388 Tests display of Ports389 """390 LOG.debug("test_show_port - START")391 new_net_dict = self._l2network_plugin.create_network(392 tenant_id, self.network_name)393 port_dict = self._l2network_plugin.create_port(394 tenant_id, new_net_dict[const.NET_ID],395 self.state)396 get_port_dict = self._l2network_plugin.get_port_details(397 tenant_id, new_net_dict[const.NET_ID],398 port_dict[const.PORT_ID])399 port = db.port_get(new_net_dict[const.NET_ID],400 port_dict[const.PORT_ID])401 self.assertEqual(port[const.PORTSTATE], self.state)402 self.assertEqual(get_port_dict[const.PORT_STATE], self.state)403 self.tearDownNetworkPort(tenant_id, new_net_dict[const.NET_ID],404 port_dict[const.PORT_ID])405 LOG.debug("test_show_port - END")406 def test_show_port_networkDNE(self, tenant_id='test_tenant',407 net_id='0005', port_id='p0005'):408 """409 Tests display of Ports when network does not exist410 """411 LOG.debug("test_show_port_networkDNE - START")412 self.assertRaises(exc.NetworkNotFound,413 self._l2network_plugin.get_port_details,414 tenant_id, net_id, port_id)415 LOG.debug("test_show_port_networkDNE - END")416 def test_show_portDNE(self, tenant_id='test_tenant', port_id='p0005'):417 """418 Tests display of Ports when port does not exist419 """420 LOG.debug("test_show_portDNE - START")421 new_net_dict = self._l2network_plugin.create_network(422 tenant_id, self.network_name)423 self.assertRaises(exc.PortNotFound,424 self._l2network_plugin.get_port_details, tenant_id,425 new_net_dict[const.NET_ID], port_id)426 self.tearDownNetwork(tenant_id, new_net_dict[const.NET_ID])427 LOG.debug("test_show_portDNE - END")428 def test_plug_interface(429 self, tenant_id='test_tenant', instance_tenant_id='nova',430 nova_user_id='novaadmin', instance_id=10,431 vif_id='fe701ddf-26a2-42ea-b9e6-7313d1c522cc'):432 """433 Tests attachment of interface to the port434 """435 LOG.debug("test_plug_interface - START")436 new_net_dict = self._l2network_plugin.create_network(437 tenant_id, self.network_name)438 port_dict = self._l2network_plugin.create_port(439 tenant_id, new_net_dict[const.NET_ID], self.state)440 instance_desc = {'project_id': tenant_id,441 'user_id': nova_user_id}442 host_list = self._l2network_plugin.schedule_host(instance_tenant_id,443 instance_id,444 instance_desc)445 instance_vif_desc = {'project_id': tenant_id,446 'user_id': nova_user_id,447 'vif_id': vif_id}448 vif_description = self._l2network_plugin.associate_port(449 instance_tenant_id, instance_id,450 instance_vif_desc)451 self._l2network_plugin.plug_interface(452 tenant_id, new_net_dict[const.NET_ID],453 port_dict[const.PORT_ID], vif_id)454 port = db.port_get(new_net_dict[const.NET_ID],455 port_dict[const.PORT_ID])456 self.assertEqual(port[const.INTERFACEID], vif_id)457 self.tearDownNetworkPortInterface(458 tenant_id, instance_tenant_id, instance_id, instance_vif_desc,459 new_net_dict[const.NET_ID], port_dict[const.PORT_ID])460 LOG.debug("test_plug_interface - END")461 def test_plug_interface_networkDNE(462 self, tenant_id='test_tenant', net_id='0005',463 port_id='p0005', remote_interface='new_interface'):464 """465 Tests attachment of interface network does not exist466 """467 LOG.debug("test_plug_interface_networkDNE - START")468 self.assertRaises(exc.NetworkNotFound,469 self._l2network_plugin.plug_interface, tenant_id,470 net_id, port_id, remote_interface)471 LOG.debug("test_plug_interface_networkDNE - END")472 def test_plug_interface_portDNE(self, tenant_id='test_tenant',473 port_id='p0005',474 remote_interface='new_interface'):475 """476 Tests attachment of interface port does not exist477 """478 LOG.debug("test_plug_interface_portDNE - START")479 new_net_dict = self._l2network_plugin.create_network(480 tenant_id, self.network_name)481 self.assertRaises(482 exc.PortNotFound, self._l2network_plugin.plug_interface, tenant_id,483 new_net_dict[const.NET_ID], port_id, remote_interface)484 self.tearDownNetwork(tenant_id, new_net_dict[const.NET_ID])485 LOG.debug("test_plug_interface_portDNE - END")486 def test_plug_interface_portInUse(487 self, tenant_id='test_tenant', instance_tenant_id='nova',488 nova_user_id='novaadmin', instance_id=10,489 vif_id='fe701ddf-26a2-42ea-b9e6-7313d1c522cc',490 remote_interface='new_interface'):491 """492 Tests attachment of new interface to the port when there is an493 existing attachment494 """495 LOG.debug("test_plug_interface_portInUse - START")496 new_net_dict = self._l2network_plugin.create_network(497 tenant_id, self.network_name)498 port_dict = self._l2network_plugin.create_port(499 tenant_id, new_net_dict[const.NET_ID], self.state)500 instance_desc = {'project_id': tenant_id,501 'user_id': nova_user_id}502 host_list = self._l2network_plugin.schedule_host(instance_tenant_id,503 instance_id,504 instance_desc)505 instance_vif_desc = {'project_id': tenant_id,506 'user_id': nova_user_id,507 'vif_id': vif_id}508 vif_description = self._l2network_plugin.associate_port(509 instance_tenant_id, instance_id,510 instance_vif_desc)511 self.assertRaises(exc.PortInUse,512 self._l2network_plugin.plug_interface, tenant_id,513 new_net_dict[const.NET_ID],514 port_dict[const.PORT_ID], remote_interface)515 self.tearDownNetworkPortInterface(516 tenant_id, instance_tenant_id, instance_id, instance_vif_desc,517 new_net_dict[const.NET_ID], port_dict[const.PORT_ID])518 LOG.debug("test_plug_interface_portInUse - END")519 def test_unplug_interface(520 self, tenant_id='test_tenant', instance_tenant_id='nova',521 nova_user_id='novaadmin', instance_id=10,522 vif_id='fe701ddf-26a2-42ea-b9e6-7313d1c522cc'):523 """524 Tests detaachment of an interface to a port525 """526 LOG.debug("test_unplug_interface - START")527 new_net_dict = self._l2network_plugin.create_network(528 tenant_id, self.network_name)529 port_dict = self._l2network_plugin.create_port(530 tenant_id, new_net_dict[const.NET_ID],531 self.state)532 instance_desc = {'project_id': tenant_id,533 'user_id': nova_user_id}534 host_list = self._l2network_plugin.schedule_host(instance_tenant_id,535 instance_id,536 instance_desc)537 instance_vif_desc = {'project_id': tenant_id,538 'user_id': nova_user_id,539 'vif_id': vif_id}540 vif_description = self._l2network_plugin.associate_port(541 instance_tenant_id, instance_id,542 instance_vif_desc)543 self._l2network_plugin.plug_interface(544 tenant_id, new_net_dict[const.NET_ID],545 port_dict[const.PORT_ID], vif_id)546 self._l2network_plugin.unplug_interface(547 tenant_id, new_net_dict[const.NET_ID],548 port_dict[const.PORT_ID])549 port = db.port_get(new_net_dict[const.NET_ID],550 port_dict[const.PORT_ID])551 vif_id_unplugged = vif_id + '(detached)'552 self.assertEqual(port[const.INTERFACEID], vif_id_unplugged)553 self.tearDownNetworkPortInterface(554 tenant_id, instance_tenant_id, instance_id, instance_vif_desc,555 new_net_dict[const.NET_ID], port_dict[const.PORT_ID])556 LOG.debug("test_unplug_interface - END")557 def test_unplug_interface_networkDNE(self, tenant_id='test_tenant',558 net_id='0005', port_id='p0005'):559 """560 Tests detaachment of an interface to a port, when the network does561 not exist562 """563 LOG.debug("test_unplug_interface_networkDNE - START")564 self.assertRaises(exc.NetworkNotFound,565 self._l2network_plugin.unplug_interface,566 tenant_id, net_id, port_id)567 LOG.debug("test_unplug_interface_networkDNE - END")568 def test_unplug_interface_portDNE(self, tenant_id='test_tenant',569 port_id='p0005'):570 """571 Tests detaachment of an interface to a port, when the port does572 not exist573 """574 LOG.debug("test_unplug_interface_portDNE - START")575 new_net_dict = self._l2network_plugin.create_network(tenant_id,576 self.network_name)577 self.assertRaises(exc.PortNotFound,578 self._l2network_plugin.unplug_interface, tenant_id,579 new_net_dict[const.NET_ID], port_id)580 self.tearDownNetwork(tenant_id, new_net_dict[const.NET_ID])581 LOG.debug("test_unplug_interface_portDNE - END")582 def test_create_portprofile(self, net_tenant_id=None,583 net_profile_name=None, net_qos=None):584 """585 Tests creation of a port-profile586 """587 LOG.debug("test_create_portprofile - tenant id: %s - START",588 net_tenant_id)589 if net_tenant_id:590 tenant_id = net_tenant_id591 else:592 tenant_id = self.tenant_id593 if net_profile_name:594 profile_name = net_profile_name595 else:596 profile_name = self.profile_name597 if net_qos:598 qos = net_qos599 else:600 qos = self.qos601 port_profile_dict = self._l2network_plugin.create_portprofile(602 tenant_id, profile_name, qos)603 port_profile_id = port_profile_dict['profile_id']604 port_profile = cdb.get_portprofile(tenant_id, port_profile_id)605 self.assertEqual(port_profile[const.PPNAME], profile_name)606 self.assertEqual(port_profile[const.PPQOS], qos)607 self.tearDownPortProfile(tenant_id, port_profile_id)608 LOG.debug("test_create_portprofile - tenant id: %s - END",609 net_tenant_id)610 def test_delete_portprofile(self, net_tenant_id=None):611 """612 Tests deletion of a port-profile613 """614 LOG.debug("test_delete_portprofile - tenant id: %s - START",615 net_tenant_id)616 if net_tenant_id:617 tenant_id = net_tenant_id618 else:619 tenant_id = self.tenant_id620 port_profile_dict = self._l2network_plugin.create_portprofile(621 tenant_id, self.profile_name, self.qos)622 port_profile_id = port_profile_dict['profile_id']623 self._l2network_plugin.delete_portprofile(tenant_id, port_profile_id)624 self.assertRaises(Exception, cdb.get_portprofile, port_profile_id)625 LOG.debug("test_delete_portprofile - tenant id: %s - END",626 net_tenant_id)627 def test_delete_portprofileDNE(self, tenant_id='test_tenant',628 profile_id='pr0005'):629 """630 Tests deletion of a port-profile when netowrk does not exist631 """632 LOG.debug("test_delete_portprofileDNE - START")633 self.assertRaises(cexc.PortProfileNotFound,634 self._l2network_plugin.delete_portprofile,635 tenant_id, profile_id)636 LOG.debug("test_delete_portprofileDNE - END")637 def test_delete_portprofileAssociated(self, tenant_id='test_tenant'):638 """639 Tests deletion of an associatedport-profile640 """641 LOG.debug("test_delete_portprofileAssociated - START")642 port_profile_dict = self._l2network_plugin.create_portprofile(643 tenant_id, self.profile_name, self.qos)644 port_profile_id = port_profile_dict['profile_id']645 new_net_dict = self._l2network_plugin.create_network(646 tenant_id, 'test_network')647 port_dict = self._l2network_plugin.create_port(648 tenant_id, new_net_dict[const.NET_ID],649 const.PORT_UP)650 self._l2network_plugin.associate_portprofile(651 tenant_id, new_net_dict[const.NET_ID],652 port_dict[const.PORT_ID], port_profile_id)653 self.assertRaises(cexc.PortProfileInvalidDelete,654 self._l2network_plugin.delete_portprofile,655 tenant_id, port_profile_id)656 self.tearDownAssociatePortProfile(657 tenant_id, new_net_dict[const.NET_ID],658 port_dict[const.PORT_ID], port_profile_id)659 self.tearDownNetworkPort(tenant_id, new_net_dict[const.NET_ID],660 port_dict[const.PORT_ID])661 LOG.debug("test_delete_portprofileAssociated - END")662 def test_list_portprofile(self, tenant_id='test_tenant'):663 """664 Tests listing of port-profiles665 """666 LOG.debug("test_list_portprofile - tenant id: %s - START", tenant_id)667 profile_name2 = tenant_id + '_port_profile2'668 qos2 = tenant_id + 'qos2'669 port_profile_dict1 = self._l2network_plugin.create_portprofile(670 tenant_id, self.profile_name, self.qos)671 port_profile_dict2 = self._l2network_plugin.create_portprofile(672 tenant_id, profile_name2, qos2)673 port_profile_id1 = port_profile_dict1['profile_id']674 port_profile_id2 = port_profile_dict2['profile_id']675 list_all_portprofiles = self._l2network_plugin.get_all_portprofiles(676 tenant_id)677 port_profile_list = [port_profile_dict1, port_profile_dict2]678 pplist = cdb.get_all_portprofiles()679 new_pplist = []680 for pp in pplist:681 new_pp = self._make_portprofile_dict(tenant_id,682 pp[const.UUID],683 pp[const.PPNAME],684 pp[const.PPQOS])685 new_pplist.append(new_pp)686 self.assertTrue(new_pplist[0] in port_profile_list)687 self.assertTrue(new_pplist[1] in port_profile_list)688 self.tearDownPortProfile(tenant_id, port_profile_id1)689 self.tearDownPortProfile(tenant_id, port_profile_id2)690 LOG.debug("test_list_portprofile - tenant id: %s - END", tenant_id)691 def test_show_portprofile(self, net_tenant_id=None):692 """693 Tests display of a port-profile694 """695 LOG.debug("test_show_portprofile - START")696 if net_tenant_id:697 tenant_id = net_tenant_id698 else:699 tenant_id = self.tenant_id700 port_profile_dict = self._l2network_plugin.create_portprofile(701 tenant_id, self.profile_name, self.qos)702 port_profile_id = port_profile_dict['profile_id']703 result_port_profile = self._l2network_plugin.get_portprofile_details(704 tenant_id, port_profile_id)705 port_profile = cdb.get_portprofile(tenant_id, port_profile_id)706 self.assertEqual(port_profile[const.PPQOS], self.qos)707 self.assertEqual(port_profile[const.PPNAME], self.profile_name)708 self.assertEqual(result_port_profile[const.PROFILE_QOS],709 self.qos)710 self.assertEqual(result_port_profile[const.PROFILE_NAME],711 self.profile_name)712 self.tearDownPortProfile(tenant_id, port_profile_id)713 LOG.debug("test_show_portprofile - tenant id: %s - END", net_tenant_id)714 def test_show_portprofileDNE(self, tenant_id='test_tenant',715 profile_id='pr0005'):716 """717 Tests display of a port-profile when network does not exist718 """719 LOG.debug("test_show_portprofileDNE - START")720 self.assertRaises(Exception,721 self._l2network_plugin.get_portprofile_details,722 tenant_id, profile_id)723 LOG.debug("test_show_portprofileDNE - END")724 def test_rename_portprofile(self, tenant_id='test_tenant',725 new_profile_name='new_profile_name'):726 """727 Tests rename of a port-profile728 """729 LOG.debug("test_rename_portprofile - START")730 port_profile_dict = self._l2network_plugin.create_portprofile(731 tenant_id, self.profile_name, self.qos)732 port_profile_id = port_profile_dict['profile_id']733 result_port_profile_dict = self._l2network_plugin.rename_portprofile(734 tenant_id, port_profile_id, new_profile_name)735 port_profile = cdb.get_portprofile(tenant_id, port_profile_id)736 self.assertEqual(port_profile[const.PPNAME], new_profile_name)737 self.assertEqual(result_port_profile_dict[const.PROFILE_NAME],738 new_profile_name)739 self.tearDownPortProfile(tenant_id, port_profile_id)740 LOG.debug("test_show_portprofile - tenant id: %s - END")741 def test_rename_portprofileDNE(self, tenant_id='test_tenant',742 profile_id='pr0005',743 new_profile_name='new_profile_name'):744 """745 Tests rename of a port-profile when network does not exist746 """747 LOG.debug("test_rename_portprofileDNE - START")748 self.assertRaises(cexc.PortProfileNotFound,749 self._l2network_plugin.rename_portprofile,750 tenant_id, profile_id, new_profile_name)751 LOG.debug("test_rename_portprofileDNE - END")752 def test_associate_portprofile(self, tenant_id='test_tenant'):753 """754 Tests association of a port-profile755 """756 LOG.debug("test_associate_portprofile - START")757 new_net_dict = self._l2network_plugin.create_network(758 tenant_id, self.network_name)759 port_dict = self._l2network_plugin.create_port(760 tenant_id, new_net_dict[const.NET_ID],761 self.state)762 port_profile_dict = self._l2network_plugin.create_portprofile(763 tenant_id, self.profile_name, self.qos)764 port_profile_id = port_profile_dict['profile_id']765 self._l2network_plugin.associate_portprofile(766 tenant_id, new_net_dict[const.NET_ID],767 port_dict[const.PORT_ID], port_profile_id)768 port_profile_associate = cdb.get_pp_binding(tenant_id, port_profile_id)769 self.assertEqual(port_profile_associate[const.PORTID],770 port_dict[const.PORT_ID])771 self.tearDownAssociatePortProfile(772 tenant_id, new_net_dict[const.NET_ID],773 port_dict[const.PORT_ID], port_profile_id)774 self.tearDownNetworkPort(775 tenant_id, new_net_dict[const.NET_ID],776 port_dict[const.PORT_ID])777 LOG.debug("test_associate_portprofile - END")778 def test_associate_portprofileDNE(self, tenant_id='test_tenant',779 net_id='0005', port_id='p00005',780 profile_id='pr0005'):781 """782 Tests association of a port-profile when a network does not exist783 """784 LOG.debug("test_associate_portprofileDNE - START")785 self.assertRaises(cexc.PortProfileNotFound,786 self._l2network_plugin.associate_portprofile,787 tenant_id, net_id, port_id, profile_id)788 LOG.debug("test_associate_portprofileDNE - END")789 def test_disassociate_portprofile(self, tenant_id='test_tenant',790 ):791 """792 Tests disassociation of a port-profile793 """794 LOG.debug("test_disassociate_portprofile - START")795 new_net_dict = self._l2network_plugin.create_network(796 tenant_id, self.network_name)797 port_dict = self._l2network_plugin.create_port(798 tenant_id, new_net_dict[const.NET_ID],799 self.state)800 port_profile_dict = self._l2network_plugin.create_portprofile(801 tenant_id, self.profile_name, self.qos)802 port_profile_id = port_profile_dict['profile_id']803 self._l2network_plugin.associate_portprofile(804 tenant_id, new_net_dict[const.NET_ID],805 port_dict[const.PORT_ID], port_profile_id)806 self._l2network_plugin.disassociate_portprofile(807 tenant_id, new_net_dict[const.NET_ID],808 port_dict[const.PORT_ID], port_profile_id)809 port_profile_associate = cdb.get_pp_binding(tenant_id, port_profile_id)810 self.assertEqual(port_profile_associate, [])811 self.tearDownPortProfile(tenant_id, port_profile_id)812 self.tearDownNetworkPort(813 tenant_id, new_net_dict[const.NET_ID],814 port_dict[const.PORT_ID])815 LOG.debug("test_disassociate_portprofile - END")816 def test_disassociate_portprofileDNE(self, tenant_id='test_tenant',817 net_id='0005', port_id='p00005', profile_id='pr0005'):818 """819 Tests disassociation of a port-profile when network does not exist820 """821 LOG.debug("test_disassociate_portprofileDNE - START")822 self.assertRaises(cexc.PortProfileNotFound,823 self._l2network_plugin.disassociate_portprofile,824 tenant_id, net_id, port_id, profile_id)825 LOG.debug("test_disassociate_portprofileDNE - END")826 def test_get_vlan_name(self, net_tenant_id=None, vlan_id="NewVlan",827 vlan_prefix=conf.VLAN_NAME_PREFIX):828 """829 Tests get vlan name830 """831 LOG.debug("test_get_vlan_name - START")832 if net_tenant_id:833 tenant_id = net_tenant_id834 else:835 tenant_id = self.tenant_id836 result_vlan_name = self._l2network_plugin._get_vlan_name(tenant_id,837 vlan_id)838 expected_output = vlan_prefix + vlan_id839 self.assertEqual(result_vlan_name, expected_output)840 LOG.debug("test_get_vlan_name - END")841 def test_validate_port_state(self, state=const.PORT_UP):842 """843 Tests validate port state844 """845 LOG.debug("test_validate_port_state - START")846 result = self._l2network_plugin._validate_port_state(state)847 self.assertEqual(result, True)848 LOG.debug("test_validate_port_state - END")849 def test_invalid_port_state(self, state="BADSTATE"):850 """851 Tests invalidate port state852 """853 LOG.debug("test_validate_port_state - START")854 self.assertRaises(exc.StateInvalid,855 self._l2network_plugin._validate_port_state,856 state)857 LOG.debug("test_validate_port_state - END")858 def setUp(self):859 """860 Set up function861 """862 self.tenant_id = "test_tenant"863 self.network_name = "test_network"864 self.profile_name = "test_tenant_port_profile"865 self.qos = "test_qos"866 self.state = const.PORT_UP867 self.net_id = '00005'868 self.port_id = 'p0005'869 self.remote_interface = 'new_interface'870 self._l2network_plugin = l2network_plugin.L2Network()871 """872 Clean up functions after the tests873 """874 def tearDown(self):875 """Clear the test environment"""876 # Remove database contents877 db.clear_db()878 def tearDownNetwork(self, tenant_id, network_dict_id):879 """880 Tear down the Network881 """882 self._l2network_plugin.delete_network(tenant_id, network_dict_id)883 def tearDownPortOnly(self, tenant_id, network_dict_id, port_id):884 """885 Tear doen the port886 """887 self._l2network_plugin.delete_port(tenant_id, network_dict_id, port_id)888 def tearDownNetworkPort(self, tenant_id, network_dict_id, port_id):889 """890 Tear down Network Port891 """892 self._l2network_plugin.delete_port(tenant_id, network_dict_id, port_id)893 self.tearDownNetwork(tenant_id, network_dict_id)894 def tearDownNetworkPortInterface(self, tenant_id, instance_tenant_id,895 instance_id, instance_desc, network_dict_id,896 port_id):897 """898 Tear down Network Port Interface899 """900 self._l2network_plugin.detach_port(instance_tenant_id, instance_id,901 instance_desc)902 self.tearDownNetworkPort(tenant_id, network_dict_id, port_id)903 def tearDownPortProfile(self, tenant_id, port_profile_id):904 """905 Tear down Port Profile906 """907 self._l2network_plugin.delete_portprofile(tenant_id, port_profile_id)908 def tearDownPortProfileBinding(self, tenant_id, port_profile_id):909 """910 Tear down port profile binding911 """912 self._l2network_plugin.delete_portprofile(tenant_id, port_profile_id)913 def tearDownAssociatePortProfile(self, tenant_id, net_id, port_id,914 port_profile_id):915 """916 Tear down associate port profile917 """918 self._l2network_plugin.disassociate_portprofile(919 tenant_id, net_id, port_id, port_profile_id)920 self.tearDownPortProfile(tenant_id, port_profile_id)921 def _make_net_dict(self, net_id, net_name, ports):922 res = {const.NET_ID: net_id, const.NET_NAME: net_name}923 res[const.NET_PORTS] = ports924 return res925 def _make_port_dict(self, port_id, state, net_id, attachment):926 res = {const.PORT_ID: port_id, const.PORT_STATE: state}927 res[const.NET_ID] = net_id928 res[const.ATTACHMENT] = attachment929 return res930 def _make_portprofile_dict(self, tenant_id, profile_id, profile_name,931 qos):932 profile_associations = self._make_portprofile_assc_list(933 tenant_id, profile_id)934 res = {const.PROFILE_ID: str(profile_id),935 const.PROFILE_NAME: profile_name,936 const.PROFILE_ASSOCIATIONS: profile_associations,937 const.PROFILE_VLAN_ID: None,938 const.PROFILE_QOS: qos}939 return res940 def _make_portprofile_assc_list(self, tenant_id, profile_id):941 plist = cdb.get_pp_binding(tenant_id, profile_id)942 assc_list = []943 for port in plist:944 assc_list.append(port[const.PORTID])...

Full Screen

Full Screen

setupkit.py

Source:setupkit.py Github

copy

Full Screen

1from urllib.parse import urljoin, urlparse2import requests3from common import superuser_login4from config import config5def api_call(auth_token, url, params={}, data_key=None, data_value=None):6 url_api_call = urljoin(config.HOST, url)7 json_call = {}8 if auth_token:9 json_call["RequestInfo"] = {"authToken": auth_token}10 if data_key:11 json_call[data_key] = data_value12 return requests.post(url_api_call, params=params, json=json_call).json()13def BANKS_CREATE_DEFAULT(tenant_id, bank_code, bank_name, active):14 return {15 "code": bank_code,16 "name": bank_name,17 "description": "",18 "active": active,19 "type": "",20 "tenantId": tenant_id,21 "deleteReason": None22 }23def search_bank(tenant_id, auth_token, code=None):24 search_criteria = {"tenantId": tenant_id}25 if code:26 search_criteria["code"] = code27 return api_call(auth_token, "egf-master/banks/_search", search_criteria)28def upsert_bank(tenant_id, auth_token, bank_code, bank_name, active=True, **kwargs):29 banks = search_bank(tenant_id, auth_token, bank_code)["banks"]30 if len(banks) > 0:31 banks[0]["code"] = bank_code32 banks[0]["name"] = bank_name33 banks[0]["active"] = active34 banks[0].update(kwargs)35 return api_call(auth_token, "egf-master/banks/_update", {"tenantId": tenant_id}, "banks", banks)36 else:37 data = BANKS_CREATE_DEFAULT(tenant_id, bank_code, bank_name, active)38 data.update(kwargs)39 return api_call(auth_token, "egf-master/banks/_create", {"tenantId": tenant_id}, "banks", [data])40def BRANCHES_CREATE_DEFAULT(tenant_id, bank_code, bank_name, bank_city, bank_state, person_number, person_contact,41 bank_id, active):42 return {43 "bank": {44 "id": bank_id,45 "code": bank_code,46 "name": bank_name,47 "description": None,48 "active": True,49 "type": None,50 "tenantId": tenant_id51 },52 "code": bank_code,53 "name": bank_name,54 "address": bank_city,55 "address2": None,56 "city": bank_city,57 "state": bank_state,58 "pincode": None,59 "phone": person_number,60 "fax": None,61 "contactPerson": person_contact,62 "active": active,63 "description": None,64 "micr": None,65 "tenantId": tenant_id,66 "deleteReason": None67 }68def search_bank_branches(tenant_id, auth_token, bank_code):69 search_criteria = {"tenantId": tenant_id}70 if bank_code:71 search_criteria["code"] = bank_code72 return api_call(auth_token, "egf-master/bankbranches/_search", search_criteria)73def upsert_bankbranches(tenant_id, auth_token, bank_code, bank_name, bank_city, bank_state, person_number,74 person_contact, active=True, **kwargs):75 banks = search_bank(tenant_id, auth_token, bank_code)["banks"]76 if len(banks) > 0:77 bank_id = banks[0]["id"]78 bank_branches = search_bank_branches(tenant_id, auth_token, bank_code)["bankBranches"]79 if len(bank_branches) > 0:80 bank_branches[0]["code"] = bank_code81 bank_branches[0]["name"] = bank_name82 bank_branches[0]["city"] = bank_city83 bank_branches[0]["state"] = bank_state84 bank_branches[0]["phone"] = person_number85 bank_branches[0]["contactPerson"] = person_contact86 bank_branches[0]["active"] = active87 bank_branches[0].update(kwargs)88 # "Do to this thinks"89 # return api_call(auth_token, "egf-master/bankbranches/_update", {"tenantId": tenant_id}, "bankBranches",90 # bank_branches)91 else:92 data = BRANCHES_CREATE_DEFAULT(tenant_id, bank_code, bank_name, bank_city, bank_state, person_number,93 person_contact, bank_id, active)94 for bank in banks:95 if bank["code"] == bank_code:96 data["bank"] = banks[0]97 data.update(kwargs)98 return api_call(auth_token, "egf-master/bankbranches/_create", {"tenantId": tenant_id}, "bankBranches",99 [data])100 else:101 raise Exception("No banks does not exists for {}".format(tenant_id))102def search_account_code_purposes(tenant_id, auth_token, name=None):103 search_criteria = {"tenantId": tenant_id}104 if name:105 search_criteria["name"] = name106 return api_call(auth_token, "egf-master/accountcodepurposes/_search", search_criteria107 )108def upsert_account_code_purposes(tenant_id, auth_token, name):109 code_purposes = search_account_code_purposes(tenant_id, auth_token, name)["accountCodePurposes"]110 if len(code_purposes) == 0:111 return api_call(auth_token, "egf-master/accountcodepurposes/_create", {"tenantId": tenant_id},112 "accountCodePurposes",113 [{114 "name": name,115 "tenantId": tenant_id}])116def CHART_OF_ACCOUNTS_DEFAULT(tenant_id, gl_code, name, type, chart_of_code_id=None, parent_id=None):117 data = {118 "glcode": gl_code,119 "name": name,120 "description": None,121 "isActiveForPosting": False,122 "type": type,123 "classification": 2,124 "functionRequired": False,125 "budgetCheckRequired": False,126 "majorCode": gl_code,127 "isSubLedger": False,128 "tenantId": tenant_id,129 }130 if chart_of_code_id:131 data["accountCodePurpose"] = {132 "id": chart_of_code_id,133 "tenantId": tenant_id134 }135 else:136 data["accountCodePurpose"] = None137 if parent_id:138 data["parentId"] = {139 "id": parent_id,140 "tenantId": tenant_id141 }142 else:143 data["parentId"] = None144 return data145def search_chart_of_accounts(tenant_id, auth_token, gl_code=None):146 search_criteria = {"tenantId": tenant_id}147 if gl_code:148 search_criteria["glcodes"] = gl_code149 return api_call(auth_token, "egf-master/chartofaccounts/_search", search_criteria)150def upsert_chart_of_accounts(tenant_id, auth_token, gl_code, name, type, chart_code_purpose_name, old_gl_code,151 **kwargs):152 chart_of_account = search_chart_of_accounts(tenant_id, auth_token, gl_code)["chartOfAccounts"]153 if chart_code_purpose_name is not None:154 chart_code_purpose = search_account_code_purposes(tenant_id, auth_token, chart_code_purpose_name)[155 "accountCodePurposes"]156 else:157 chart_code_purpose = []158 if old_gl_code is not None:159 chart_of_old_account = search_chart_of_accounts(tenant_id, auth_token, old_gl_code)["chartOfAccounts"]160 else:161 chart_of_old_account = []162 parent_id = None163 chart_of_code_id = None164 if len(chart_code_purpose) == 1:165 chart_of_code_id = chart_code_purpose[0]["id"]166 if len(chart_of_old_account) == 1:167 parent_id = chart_of_old_account[0]["id"]168 if len(chart_of_account) > 0:169 chart_of_account[0]["name"] = name170 chart_of_account[0]["type"] = type171 chart_of_account[0].update(kwargs)172 # return api_call(auth_token, "egf-master/chartofaccounts/_update", {"tenantId": tenant_id}, "chartOfAccounts",173 # chart_of_account)174 # do to think175 else:176 data = CHART_OF_ACCOUNTS_DEFAULT(tenant_id, gl_code, name, type, chart_of_code_id, parent_id)177 data.update(kwargs)178 return api_call(auth_token, "egf-master/chartofaccounts/_create", {"tenantId": tenant_id}, "chartOfAccounts",179 [data])180def FUNDS_DEFAULT(tenant_id, fund_name, fund_code, identifier):181 return {182 "name": fund_name,183 "code": fund_code,184 "identifier": identifier,185 "level": 0,186 "parent": None,187 "active": True,188 "tenantId": tenant_id,189 "deleteReason": None190 }191def search_funds(tenant_id, auth_token, code):192 search_criteria = {"tenantId": tenant_id}193 if code:194 search_criteria["code"] = code195 return api_call(auth_token, "egf-master/funds/_search", {"tenantId": tenant_id})196def upsert_funds(tenant_id, auth_token, fund_name, fund_code, identifier, **kwargs):197 fund = search_funds(tenant_id, auth_token, fund_code)["funds"]198 if len(fund) > 0:199 raise Exception("funds already exist for {}".format(tenant_id))200 else:201 data = FUNDS_DEFAULT(tenant_id, fund_name, fund_code, identifier)202 data.update(kwargs)203 return api_call(auth_token, "egf-master/funds/_create", {"tenantId": tenant_id}, "funds",204 [data])205def ACCOUNT_CREATE_DEFAULT(tenant_id, account_number, account_type, gl_code, branch_id, chart_id, fund_id):206 return {207 "bankBranch": {208 "id": branch_id,209 "tenantId": tenant_id210 },211 "chartOfAccount": {212 "id": chart_id,213 "glcode": gl_code,214 "tenantId": tenant_id215 },216 "fund": {217 "id": fund_id,218 "tenantId": tenant_id219 },220 "accountNumber": account_number,221 "accountType": account_type,222 "description": None,223 "active": True,224 "payTo": None,225 "type": "RECEIPTS_PAYMENTS",226 "tenantId": tenant_id,227 "createdBy": None,228 "deleteReason": None229 }230def search_bank_accounts(tenant_id, auth_token):231 return api_call(auth_token, "egf-master/bankaccounts/_search", {"tenantId": tenant_id})232def upsert_bank_accounts(tenant_id, auth_token, account_number, account_type, bank_code, gl_code, fund_code,233 **kwargs):234 bank_accounts = search_bank_accounts(tenant_id, auth_token)["bankAccounts"]235 bank_branches = search_bank_branches(tenant_id, auth_token, bank_code)["bankBranches"]236 accounts = search_chart_of_accounts(tenant_id, auth_token, gl_code)["chartOfAccounts"]237 funds = search_funds(tenant_id, auth_token)["funds"]238 branch_id = None239 chart_id = None240 fund_id = None241 if len(bank_branches) > 0:242 branch_id = bank_branches[0]["id"]243 else:244 raise Exception("bank branch not exist for bank {}".format(bank_code))245 if len(accounts) > 0:246 chart_id = accounts[0]["id"]247 else:248 raise Exception("chart of account not exist for gl_code{}".format(gl_code))249 if len(funds) > 0:250 fund_id = funds["id"]251 else:252 raise Exception(" funds not exist for {}".format(tenant_id))253 found = False254 for bank_account in bank_accounts:255 if bank_account["accountNumber"] == account_number:256 found = True257 break258 if not found:259 data = ACCOUNT_CREATE_DEFAULT(tenant_id, account_number, account_type, gl_code, branch_id, chart_id, fund_id)260 data.update(kwargs)261 return api_call(auth_token, "egf-master/bankaccounts/_create", {"tenantId": tenant_id}, "bankAccounts", [data])262 else:263 bank_accounts[0]["accountNumber"] = account_number264 bank_accounts[0]["accountType"] = account_type265 bank_accounts[0].update(kwargs)266 return api_call(auth_token, "egf-master/bankaccounts/_update", {"tenantId": tenant_id}, "bankAccounts",267 bank_account)268def BUSINESS_SERVICES_DEFAULT(tenant_id, business_service):269 return {270 "tenantId": tenant_id,271 "businessService": business_service,272 "collectionModesNotAllowed": [],273 "partPaymentAllowed": True,274 "callBackForApportioning": False,275 "callBackApportionURL": None,276 "auditDetails": None277 }278def search_business_services(tenant_id, auth_token, business_service=None):279 search_criteria = {"tenantId": tenant_id}280 if business_service:281 search_criteria["businessService"] = business_service282 return api_call(auth_token, "billing-service/businessservices/_search",283 search_criteria)284def upsert_business_services(tenant_id, auth_token, business_service, **kwargs):285 business_services = search_business_services(tenant_id, auth_token, business_service)["BusinessServiceDetails"]286 d = len(business_services)287 if len(business_services) == 0:288 data = BUSINESS_SERVICES_DEFAULT(tenant_id, business_service)289 data.update(kwargs)290 return api_call(auth_token, "billing-service/businessservices/_create",291 {"tenantId": tenant_id, "businessService": business_service}, "BusinessServiceDetails", [data])292 else:293 raise Exception("business services is already exist for {}".format(tenant_id))294def TAX_PERIODS_DEFAULT(tenant_id, service, period_cycle, financial_year):295 return {296 "tenantId": tenant_id,297 "fromDate": 1396310400000,298 "toDate": 1427846399000,299 "periodCycle": period_cycle,300 "service": service,301 "code": "PTAN" + financial_year,302 "financialYear": financial_year303 }304def search_tax_periods(tenant_id, auth_token, service, financial_year=None):305 search_criteria = {"tenantId": tenant_id, "service": service}306 if financial_year:307 search_criteria["financialYear"] = financial_year308 return api_call(auth_token, "billing-service/taxperiods/_search",309 search_criteria)310def upsert_tax_periods(tenant_id, auth_token, service, period_cycle, financial_year, **kwargs):311 tax_periods = search_tax_periods(tenant_id, auth_token, service, financial_year)["TaxPeriods"]312 if len(tax_periods) == 0:313 data = TAX_PERIODS_DEFAULT(tenant_id, service, period_cycle, financial_year)314 data.update(kwargs)315 return api_call(auth_token, "billing-service/taxperiods/_create", {"tenantId": tenant_id, "service": service},316 "TaxPeriods", [data])317def TAX_HEADS_DEFAULT(tenant_id, service, category, name, code):318 return {319 "tenantId": tenant_id,320 "category": category,321 "service": service,322 "name": name,323 "code": code,324 "isDebit": False,325 "isActualDemand": True,326 "validFrom": 1143849600000,327 "validTill": 1554076799000,328 "order": 1329 }330def search_tax_heads(tenant_id, auth_token, service, code):331 search_criteria = {"tenantId": tenant_id}332 if service and code:333 search_criteria["service"] = service334 search_criteria["code"] = code335 return api_call(auth_token, "billing-service/taxheads/_search", search_criteria)336def upsert_tax_heads(tenant_id, auth_token, service, category, name, code, **kwargs):337 tax_heads = search_tax_heads(tenant_id, auth_token, service, code)["TaxHeadMasters"]338 if len(tax_heads) == 0:339 data = TAX_HEADS_DEFAULT(tenant_id, service, category, name, code)340 data.update(kwargs)341 return api_call(auth_token, "billing-service/taxheads/_create", {"tenantId": tenant_id, "service": service},342 "TaxHeadMasters", [data])343 else:344 raise Exception("tax head alrady exist for {}".format(tenant_id))345def GL_CODE_MASTER_DEFAULT(tenant_id, tax_head, service, gl_code):346 return {347 "tenantId": tenant_id,348 "taxHead": tax_head,349 "service": service,350 "glCode": gl_code,351 "fromDate": 1143849600000,352 "toDate": 1554076799000353 }354def search_gl_code_masters(tenant_id, auth_token, service, gl_code=None):355 search_criteria = {"tenantId": tenant_id, "service": service}356 if gl_code:357 search_criteria["glCode"] = gl_code358 return api_call(auth_token, "billing-service/glcodemasters/_search", search_criteria359 )360def upsert_gl_code_master(tenant_id, auth_token, tax_head, service, gl_code, **kwargs):361 gl_code_master = search_gl_code_masters(tenant_id, auth_token, service, gl_code)["GlCodeMasters"]362 if len(gl_code_master) == 0:363 data = GL_CODE_MASTER_DEFAULT(tenant_id, tax_head, service, gl_code)364 data.update(kwargs)365 return api_call(auth_token, "billing-service/glcodemasters/_create",366 {"tenantId": tenant_id, "service": service}, "GlCodeMasters", [data])367def INSTRUMENT_TYPE_DEFAULT(tenant_id, name, description, active):368 import uuid369 return {370 "id": str(uuid.uuid4()),371 "name": name,372 "description": description,373 "active": active,374 "instrumentTypeProperties": [],375 "tenantId": tenant_id376 }377def search_instrument_types(tenant_id, auth_token, name=None):378 search_criteria = {"tenantId": tenant_id}379 if name:380 search_criteria["name"] = name381 return api_call(auth_token, "egf-instrument/instrumenttypes/_search", search_criteria)382def upsert_instrument_type(tenant_id, auth_token, name, description, active=True, **kwargs):383 instruments = search_instrument_types(tenant_id, auth_token, name)["instrumentTypes"]384 if len(instruments) > 0:385 instruments[0]["id"] = id386 instruments[0]["name"] = name387 instruments[0].update(kwargs)388 return api_call(auth_token, "egf-instrument/instrumenttypes/_update", {"tenantId": tenant_id},389 "instrumentTypes",390 instruments)391 else:392 data = INSTRUMENT_TYPE_DEFAULT(tenant_id, name, description, active)393 data.update(kwargs)394 return api_call(auth_token, "egf-instrument/instrumenttypes/_create", {"tenantId": tenant_id},395 "instrumentTypes",396 [data])397def INSTRUMENT_ACCOUNT_CODE_DEFAULT(tenant_id, instruments_type_id, name, gl_code):398 return {399 "instrumentType": {400 "id": instruments_type_id,401 "name": name402 },403 "accountCode": {404 "glcode": gl_code,405 "tenantId": tenant_id406 },407 "tenantId": tenant_id408 }409def search_instrument_account_codes(tenant_id, auth_token, name):410 search_criteria = {"tenantId": tenant_id}411 if name:412 search_criteria["name"] = name413 return api_call(auth_token, "egf-instrument/instrumentaccountcodes/_search", search_criteria)414def upsert_instrument_account_codes(tenant_id, auth_token, name, gl_code):415 instruments_account_code = search_instrument_account_codes(tenant_id, auth_token, name)["instrumentAccountCodes"]416 instruments = search_instrument_types(tenant_id, auth_token, name)["instrumentTypes"]417 if len(instruments) > 0:418 instruments_type_id = instruments[0]["id"]419 else:420 raise Exception("instrument type id is not exist for {}".format(name))421 if len(instruments_account_code) > 0:422 instruments_account_code["instrumentType"]["id"] = instruments_type_id423 instruments_account_code["instrumentType"]["name"] = name424 instruments_account_code["accountCode"]["glcode"] = gl_code425 instruments_account_code["accountCode"]["tenantId"] = tenant_id426 instruments_account_code["tenantId"] = tenant_id427 return api_call(auth_token, "egf-instrument/instrumentaccountcodes/_update", {"tenantId": tenant_id},428 "instrumentAccountCodes", instruments_account_code)429 else:430 data = INSTRUMENT_ACCOUNT_CODE_DEFAULT(tenant_id, instruments_type_id, name, gl_code)431 return api_call(auth_token, "egf-instrument/instrumentaccountcodes/_create", {"tenantId": tenant_id},432 "instrumentAccountCodes", [data])433def BUSINESS_CATEGORY_DEFAULT(tenant_id, name, code, active):434 return {435 "name": name,436 "code": code,437 "active": active,438 "tenantId": tenant_id,439 "version": None440 }441def search_business_category(tenant_id, name=None):442 search_criteria = {"tenantId": tenant_id}443 if name:444 search_criteria["name"] = name445 return api_call(auth_token, "egov-common-masters/businessCategory/_search", search_criteria)446def upsert_business_category(tenant_id, auth_token, name, code, active=True, **kwargs):447 business_category = search_business_category(tenant_id, name)["BusinessCategory"]448 if len(business_category) > 0:449 business_category[0]["name"] = name450 business_category[0]["code"] = code451 business_category[0].update(kwargs)452 return api_call(auth_token, "egov-common-masters/businessCategory/_update", {"tenantId": tenant_id},453 "BusinessCategory", business_category)454 else:455 data = BUSINESS_CATEGORY_DEFAULT(tenant_id, name, code, active)456 data.update(kwargs)457 return api_call(auth_token, "egov-common-masters/businessCategory/_create", {"tenantId": tenant_id},458 "BusinessCategory", [data])459def BUSINESS_DETAILS_DEFAULT(tenant_id, name, url, type, code, fund_code, category_id, function, depart):460 return {461 "name": name,462 "tenantId": tenant_id,463 "active": True,464 "businessUrl": url,465 "code": code,466 "businesstype": type,467 "fund": fund_code,468 "businessCategory": category_id,469 "function": function,470 "isVoucherApproved": False,471 "department": depart,472 "voucherCreation": False,473 "fundSource": "Online",474 "ordernumber": 1475 }476def search_business_details(tenant_id, code):477 search_criteria = {"tenantId": tenant_id}478 if code:479 # search_criteria["name"] =480 search_criteria["businessDetailsCodes"] = code481 return api_call(auth_token, "egov-common-masters/businessDetails/_search", search_criteria)482def upsert_business_details(tenant_id, auth_token, name, code, url, type, fund_code, function, depart, **kwargs):483 business_details = search_business_details(tenant_id, code)["BusinessDetails"]484 business_category = search_business_category(tenant_id, name)["BusinessCategory"]485 category_id = None486 if len(business_category) > 0:487 category_id = business_category[0]["id"]488 else:489 raise Exception(" business category is not exist for {}".format(code))490 if len(business_details) > 0:491 business_details[0]["name"] = name492 business_details[0]["code"] = code493 business_details[0]["businessurl"] = url494 business_details[0]["businesstype"] = type495 business_details[0]["fund"] = fund_code496 business_details[0]["function"] = function497 business_details[0]["department"] = depart498 business_details[0].update(kwargs)499 return api_call(auth_token, "egov-common-masters/businessDetails/_update", {"tenantId": tenant_id},500 "BusinessDetails", business_details)501 else:502 data = BUSINESS_DETAILS_DEFAULT(tenant_id, name, url, type, code, fund_code, category_id,503 function, depart)504 data.update(kwargs)505 return api_call(auth_token, "egov-common-masters/businessDetails/_create", {"tenantId": tenant_id},506 "BusinessDetails", [data])507def data_rainmaker_tl(tenant_id,auth_token,modules,locale):508 return api_call(auth_token, "egov-common-masters/businessCategory/_create", {"tenantId": tenant_id,"modules":modules,"locale":locale })509if __name__ == "__main__":510 auth_token = superuser_login()["access_token"]511 tenant_id = "pb.zirakpur"512 # print(search_bank("pb.zirakpur", auth_token))Cash-Cash In Transit513 # print(search_bank_branches(tenant_id, auth_token, 'AXIS'))514 # upsert_bank(tenant_id, auth_token, 'AXIS', 'AXIS BANK')515 # print(upsert_bankbranches(tenant_id, auth_token, 'AXIS', 'AXIS_BANK', 'zirakpur', 'Punjab', '9876543212', 'deep'))516 # print(upsert_account_code_purposes(tenant_id,auth_token, 'Cheque In Hand'))517 # print(upsert_account_code_purposes(tenant_id,auth_token, 'Demand Draft In Hand'))518 # print(upsert_account_code_purposes(tenant_id,auth_token, 'Online'))519 # print(upsert_account_code_purposes(tenant_id,auth_token, 'Cash In Hand'))520 # print(upsert_chart_of_accounts(tenant_id, auth_token, "1", "Income", "I", None, None))521 # print(upsert_chart_of_accounts(tenant_id,auth_token,'4501003','Cash-Cash ',"A","Cash In Hand","4501001"))522 # print(search_chart_of_accounts(tenant_id,auth_token,'4'))523 # print(upsert_chart_of_accounts(tenant_id, auth_token, '4', 'Assets', "A",None, None))524 # print(upsert_chart_of_accounts(tenant_id,auth_token,'450','Cash and Bank balance',"A",None,"4"))525 # print(upsert_chart_of_accounts(tenant_id,auth_token,'45010','Cash-Cash',"A",None,"450"))526 # #print(upsert_chart_of_accounts(tenant_id,auth_token,'4501056','Cash-Cheques-in-hand',"A","Cheque In Hand","45010"))527 # print(upsert_instrument_type(tenant_id,auth_token,'Cash',"Instrument for Cash payments"))528 # print(upsert_instrument_type(tenant_id,auth_token,'Online',"Instrument for Online payments"))529 # print(upsert_instrument_type(tenant_id,auth_token,'BankChallan',"Instrument for BankChallan paen_INyments"))530 # print(upsert_instrument_type(tenant_id,auth_token,'DD',"Instrument for Demand Draft payments"))531 # print(upsert_tax_heads(tenant_id,auth_token,'PT','PENALTYES','Pt adhoc penalty','PT_ADHOC_PENALTY'))532 # print(upsert_chart_of_accounts(tenant_id, auth_token, '4501051', 'Cash-Cheques-in',"A", "Cheque In Hand", "45010"))533 # print(search_tax_heads(tenant_id,auth_token,'PT'))534 # print(upsert_business_details(tenant_id,auth_token,'Citizen Services','CS',"/receipts/receipt-create.action","BILLBASED","01","909100","AS"))535 #print(upsert_tax_periods(tenant_id, auth_token, 'PT', "annual", "2019-20"))...

Full Screen

Full Screen

identity_platform.py

Source:identity_platform.py Github

copy

Full Screen

...80 @property81 def provider_id(self) -> str:82 return self._data["sign_in_method"]83 @property84 def tenant_id(self) -> Optional[str]:85 return self._data.get("tenant_id", None)86 @property87 def oauth(self) -> FirebaseOAuth:88 return FirebaseOAuth(89 id_token=self._data["oauth_id_token"],90 access_token=self._data["oauth_access_token"],91 refresh_token=self._data.get("oauth_refresh_token", None),92 token_secret=self._data.get("oauth_token_secret", None),93 )94 @property95 def event_type(self) -> str:96 return self._data["event_type"]97 @property98 def ip_address(self) -> str:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful