Best Python code snippet using tempest_python
database.py
Source:database.py  
1import argparse2import asyncio3import os4import sqlite35import yaml6from aiohttp import web7from prettytable import PrettyTable8from constants import DATABASE_NAME9parser = argparse.ArgumentParser(description="Cluster Launcher User Manager - A system to view, edit and manage the user database associated with the Cluster Launcher.")10subparsers = parser.add_subparsers(dest='subparser')11#Parser for User interactions12parser_user = subparsers.add_parser('user', help="Manipulate a User in the User Table")13user_subparsers = parser_user.add_subparsers(dest='options')14#Add Users to the User Table15parser_add_user = user_subparsers.add_parser('add', help="Add a User in the User Table")16parser_add_user.add_argument('user', nargs=1, help="Username of the user to add")17#Remove Users from the User Table18parser_remove_user = user_subparsers.add_parser('remove', help="Remove a User in the User Table")19parser_remove_user.add_argument('user', nargs=1, help="Username of the user to add")20#Parser for the Volume Table21parser_volume = subparsers.add_parser('volume', help="Manipulate a Volume in the Volume Table")22volume_subparsers = parser_volume.add_subparsers(dest='options')23#Add Volume to Volume Table24parser_add_volume = volume_subparsers.add_parser('add')25parser_add_volume.add_argument('user', nargs=1)26parser_add_volume.add_argument('tenant_name', nargs=1)27parser_add_volume.add_argument('volume_name', nargs=1)28parser_add_volume.add_argument('-volSize', nargs=1)29#Remove Volume to Volume Table30parser_remove_volume = volume_subparsers.add_parser('remove')31parser_remove_volume.add_argument('user', nargs=1)32parser_remove_volume.add_argument('tenant_name', nargs=1)33#Parser for Tenant Interactions34parser_tenant = subparsers.add_parser('tenant', help="Manipulate a Tenant in the Tenant Table")35tenant_subparsers = parser_tenant.add_subparsers(dest='options')36#Populate the Tenant Table37parser_add_tenant = tenant_subparsers.add_parser('populate', help="Populate the Tenant's Table from tenants_conf.yml")38parser_remove_tenant = tenant_subparsers.add_parser('depopulate', help="Remove all rows in the Tenant's Table")39parser_search_tenant = tenant_subparsers.add_parser('search', help="Find a Tenant's ID")40parser_search_tenant.add_argument('tenant_name', nargs=1, help="Name of Tenant")41#Add users to the Tenant Table42parser_associate_user = subparsers.add_parser('link', help="Add a User to the Tenant's Database")43parser_associate_user.add_argument('user', nargs=1, help="Username of the user to add")44parser_associate_user.add_argument('tenant_name', nargs=1, help="Name of the tenant to add the user to")45#Remove users from the tenants database46parser_deassociate_user = subparsers.add_parser('delink', help="Remove a User from the Tenant's Database.")47parser_deassociate_user.add_argument('user', help="Username of the user to remove")48parser_deassociate_user.add_argument('tenant_name', help="Name of the tenant to remove the user from")49parser_cluster = subparsers.add_parser('cluster', help="Manage a Cluster in the database in the event of errors")50cluster_subparsers = parser_cluster.add_subparsers(dest='options')51parser_remove_cluster = cluster_subparsers.add_parser('remove', help="Remove a Cluster from the table")52parser_remove_cluster.add_argument('user', help="Username of the user to remove")53parser_network = subparsers.add_parser('network', help="Manage a Network in the database in the event of errors")54network_subparsers = parser_network.add_subparsers(dest='options')55parser_remove_network = network_subparsers.add_parser('remove', help="Remove a Network from the table")56parser_remove_network.add_argument('user', help="Username of the user to remove")57parser_remove_network.add_argument('tenant_name', help="Name of the tenant")58parser_search = subparsers.add_parser('search', help="Search to find a user-tenant mapping")59parser_search.add_argument('user', help="Username of the user to remove")60parser_search.add_argument('tenant_name', help="ID of the tenant to remove the user from")61#Parser for outputting information to users62parser_list = subparsers.add_parser('list', help="List information stored in tables")63list_subparsers = parser_list.add_subparsers(dest='options')64#Output tenant table information65parser_list_tenants = list_subparsers.add_parser('tenant')66#Output user table information67parser_list_users = list_subparsers.add_parser('user')68#Output mapping table information69parser_list_mappings = list_subparsers.add_parser('mapping')70#Output clusters table information71clusters_list_mappings = list_subparsers.add_parser('clusters')72#Output networks table information73networks_list_mappings = list_subparsers.add_parser('networks')74#Output volumes table information75volumes_list_mappings = list_subparsers.add_parser('volumes')76def initialise_database():77  if not os.path.exists(DATABASE_NAME):78    with open(DATABASE_NAME, 'w'): pass79  #Initialise the database by creating the SQL tables if not present already80  db = sqlite3.connect(DATABASE_NAME)81  cursor = db.cursor()82  cursor.execute('''CREATE TABLE IF NOT EXISTS users(83    username TEXT PRIMARY KEY NOT NULL)84  ''')85  cursor.execute('''CREATE TABLE IF NOT EXISTS tenants(86    tenants_name TEXT PRIMARY KEY NOT NULL,87    tenants_id TEXT NOT NULL,88    lustre_description TEXT NOT NULL)89  ''')90  cursor.execute('''CREATE TABLE IF NOT EXISTS user_tenant_mapping(91    username TEXT NOT NULL REFERENCES users(username),92    tenant_name TEXT NOT NULL REFERENCES tenants(tenants_name),93    PRIMARY KEY (username, tenant_name))94  ''')95  cursor.execute('''CREATE TABLE IF NOT EXISTS volumes(96    username TEXT NOT NULL REFERENCES users(username),97    tenant_name TEXT NOT NULL REFERENCES tenants(tenants_name),98    volume_name TEXT NOT NULL,99    volume_size INTEGER,100    PRIMARY KEY (username, tenant_name))101  ''')102  cursor.execute('''CREATE TABLE IF NOT EXISTS clusters(103    username TEXT PRIMARY KEY NOT NULL,104    tenant TEXT NOT NULL,105    cluster_ip TEXT NOT NULL,106    num_workers TEXT NOT NULL)107  ''')108  cursor.execute('''109    CREATE TABLE IF NOT EXISTS networking(110      user_name TEXT NOT NULL,111      network_id TEXT NOT NULL,112      subnet_id TEXT NOT NULL,113      router_id TEXT NOT NULL,114      tenant_name TEXT NOT NULL,115      PRIMARY KEY (user_name, tenant_name)116    )117  ''')118  db.commit()119  return db, cursor120def add_user(user):121  db, cursor = initialise_database()122  try:123    cursor.execute("INSERT INTO users (username) VALUES (?)",(user))124    db.commit()125  except sqlite3.IntegrityError:126    print("Username is already registered in the Database")127  db.close()128def remove_user(user):129  db, cursor = initialise_database()130  cursor.execute("DELETE from users where username = ?", (user))131  db.commit()132  db.close()133def add_volume(user, tenant_name, volume_name):134  db, cursor = initialise_database()135  try:136    cursor.execute("INSERT INTO volumes (username, tenant_name, volume_name) VALUES (?,?,?)",(user,tenant_name,volume_name))137    db.commit()138  except sqlite3.IntegrityError:139    print("Username already has a volume registered to that tenant")140  db.close()141def remove_volume(user, tenant_name):142  db, cursor = initialise_database()143  cursor.execute("DELETE from volumes where username = ? AND tenant_name = ?", (user, tenant_name))144  db.commit()145  db.close()146def add_cluster(user, tenant_name, cluster_ip, num_workers):147  db, cursor = initialise_database()148  try:149    cursor.execute('''INSERT INTO clusters (username, tenant, cluster_ip, num_workers)150                      VALUES (?, ?, ?, ?)''', (user, tenant_name, cluster_ip, num_workers))151  except sqlite3.IntegrityError:152    print("This user already has a cluster registered. An error has occured")153  db.commit()154  db.close()155def remove_cluster(user):156  db, cursor = initialise_database()157  cursor.execute("DELETE from clusters where username = ?", (user,))158  db.commit()159  db.close()160def add_network(username, network_id, subnet_id, router_id, tenant_name):161  db, cursor = initialise_database()162  cursor.execute('''INSERT INTO163      networking(user_name, network_id, subnet_id, router_id, tenant_name)164      VALUES(?, ?, ?, ?, ?)''',165      (username, network_id, subnet_id, router_id, tenant_name))166  db.commit()167  db.close()168def remove_network(user, tenant_name):169  db, cursor = initialise_database()170  cursor.execute("DELETE from networking where user_name = ? AND tenant_name = ?", (user, tenant_name))171  db.commit()172  db.close()173def populate_tenants():174  db, cursor = initialise_database()175  with open('tenants_conf.yml', 'r') as tenant_file:176    data = yaml.load(tenant_file, Loader=yaml.Loader)177  for tenant in data['tenants']:178    id = data['tenants'][tenant].get('id')179    lustre_desc = data['tenants'][tenant].get('network')180    try:181      cursor.execute("INSERT INTO tenants (tenants_name, tenants_id, lustre_description) VALUES (?, ?, ?)",(tenant, id, lustre_desc))182      db.commit()183    except sqlite3.IntegrityError:184      pass185  db.close()186def depopulate_tenants():187  db, cursor = initialise_database()188  cursor.execute("DELETE FROM tenants")189  print(str(cursor.rowcount) + " rows have been deleted from the tenant table")190  db.commit()191  db.close()192def link_tables(user, tenant):193  db, cursor = initialise_database()194  cursor.execute('pragma foreign_keys=ON')195  tenant = tenant.lower()196  cursor.execute("INSERT INTO user_tenant_mapping (username, tenant_name) VALUES (?, ?)",(user, tenant))197  db.commit()198  db.close()199def delink_tables(user, tenant):200  db, cursor = initialise_database()201  tenant = tenant.lower()202  cursor.execute("DELETE from user_tenant_mapping where username = ? AND tenant_name = ?;", (user, tenant))203  db.commit()204  db.close()205def display_tenants():206  db, cursor = initialise_database()207  cursor.execute("SELECT * from tenants ORDER BY tenants_name")208  results = cursor.fetchall()209  table = PrettyTable()210  table.field_names = ["Tenant Name", "Tenant ID", "Lustre Description"]211  table.align["Tenant Name"] = "l"212  for tenant in results:213    row = [str(tenant[0]), str(tenant[1]), str(tenant[2])]214    table.add_row(row)215  print(table)216  db.close()217def display_users():218  db, cursor = initialise_database()219  cursor.execute("SELECT * from users ORDER BY username")220  results = cursor.fetchall()221  table = PrettyTable()222  table.field_names = ["Username"]223  for user in results:224    row = [str(user[0])]225    table.add_row(row)226  print(table)227  db.close()228def display_mapping():229  db, cursor = initialise_database()230  cursor.execute("SELECT * from user_tenant_mapping ORDER BY username")231  results = cursor.fetchall()232  table = PrettyTable()233  table.field_names = ["Username", "Tenant Name"]234  for mapping in results:235    row = [str(mapping[0]), str(mapping[1])]236    table.add_row(row)237  print(table)238  db.close()239def display_clusters():240  db, cursor = initialise_database()241  cursor.execute("SELECT * from clusters ORDER BY username")242  results = cursor.fetchall()243  table = PrettyTable()244  table.field_names = ["Username", "Tenant Name", "Cluster IP", "Number of Workers"]245  for clusters in results:246    row = [str(clusters[0]), str(clusters[1]), str(clusters[2]), str(clusters[3])]247    table.add_row(row)248  print(table)249  db.close()250def display_networks():251  db, cursor = initialise_database()252  cursor.execute("SELECT * from networking ORDER BY user_name")253  results = cursor.fetchall()254  table = PrettyTable()255  table.field_names = ["Username", "Network ID", "Subnet ID", "Router ID", "Tenant"]256  for networks in results:257    row = [str(networks[0]), str(networks[1]), str(networks[2]), str(networks[3]), str(networks[4])]258    table.add_row(row)259  print(table)260  db.close()261def display_volumes():262  db, cursor = initialise_database()263  cursor.execute("SELECT * from volumes ORDER BY username")264  results = cursor.fetchall()265  table = PrettyTable()266  table.field_names = ["Username", "Tenant Name", "Volume Name", "Volume Size"]267  for volumes in results:268    row = [str(volumes[0]), str(volumes[1]), str(volumes[2]), str(volumes[3])]269    table.add_row(row)270  print(table)271  db.close()272def request_mappings(request):273  db, cursor = initialise_database()274  cursor.execute("SELECT * from user_tenant_mapping ORDER BY username")275  results = cursor.fetchall()276  db.close()277  return web.json_response(results)278def search_user(user, tenant_name):279  db, cursor = initialise_database()280  cursor.execute("SELECT * from user_tenant_mapping WHERE username = ? AND tenant_name = ?",(user,tenant_name))281  result = cursor.fetchall()282  db.close()283  if result == []:284    return False285  else:286    return True287def tenant_finder(user):288  db, cursor = initialise_database()289  cursor.execute('''SELECT tenant FROM clusters WHERE username = ?''',290                    (user,))291  tenant = cursor.fetchall()292  return tenant[0][0]293def fetch_id(tenant_name):294  db, cursor = initialise_database()295  cursor.execute("SELECT tenants_id from tenants WHERE tenants_name = ?", [tenant_name])296  tenant_id = cursor.fetchone()[0]297  return tenant_id298def lustre_status(tenant_name):299  db, cursor = initialise_database()300  cursor.execute("SELECT lustre_description from tenants WHERE tenants_name = ?",[tenant_name])301  lustre_network= cursor.fetchone()[0]302  return lustre_network303def checkVolumes(username, tenant_name):304  db, cursor = initialise_database()305  cursor.execute("SELECT volume_name from volumes WHERE username = ? AND tenant_name = ?", [username, tenant_name])306  try:307    volume_name = cursor.fetchall()[0][0]308  except Exception:309    volume_name = None310  return volume_name311def checkMappings(request):312  db, cursor = initialise_database()313  if 'X-Forwarded-User' in request.headers:314    username = request.headers['X-Forwarded-User']315  else:316    #For testing purposes317    username = "non-swarm-test"318  cursor.execute("SELECT tenant_name from user_tenant_mapping WHERE username = ?", [username])319  list_of_tenant_names = cursor.fetchall()320  array = {}321  for tenant_name in list_of_tenant_names:322    volume_name = checkVolumes(username, tenant_name[0])323    array[tenant_name[0]] = volume_name324  return web.json_response(array)325if __name__ == '__main__':326  args = parser.parse_args()327  #Manage Users Table328  if args.subparser == "user":329    #Add a user to the user's table330    if args.options == "add":331      add_user(args.user)332    #Remove a user from the user's table333    if args.options == "remove":334      remove_user(args.user)335  if args.subparser == "volume":336    if args.options == "add":337      add_volume(args.user[0], args.tenant_name[0], args.volume_name[0])338    if args.options == "remove":339      remove_volume(args.user[0], args.tenant_name[0])340  if args.subparser == "cluster":341    if args.options == "remove":342      remove_cluster(args.user)343  if args.subparser == "network":344    if args.options == "remove":345      remove_network(args.user, args.tenant_name)346  #Populate the Tenant's Table347  if args.subparser == "tenant":348    if args.options == "populate":349      populate_tenants()350    if args.options == "depopulate":351      depopulate_tenants()352    if args.options == "search":353      fetch_id(args.tenant_name[0])354  #Add links in the User-Tenant Mapping Table355  if args.subparser == "link":356    link_tables(args.user[0], args.tenant_name[0])357  #Delink in the User-Tenant Mapping Table358  if args.subparser == "delink":359    delink_tables(args.user, args.tenant_name)360  #List the tenants currently stored361  if args.subparser == "list":362    if args.options == "tenant":363      display_tenants()364    if args.options == "user":365      display_users()366    if args.options == "mapping":367      display_mapping()368    if args.options == "clusters":369      display_clusters()370    if args.options == "networks":371      display_networks()372    if args.options == "volumes":373      display_volumes()374  if args.subparser == "search":...tenant_policy.py
Source:tenant_policy.py  
1import logging2from sqlalchemy.sql import and_3from .. import exceptions, permissions, paginate4from .. import models5from ..api.permission import Permission6from ..api.tenant import Tenant7from ..models.Permission import PermissionType8logger = logging.getLogger(__name__)9TENANT_NAME = 'dataall'10class TenantPolicy:11    @staticmethod12    def is_tenant_admin(groups: [str]):13        if not groups:14            return False15        if 'DAAdministrators' in groups:16            return True17        return False18    @staticmethod19    def check_user_tenant_permission(20        session, username: str, groups: [str], tenant_name: str, permission_name: str21    ):22        if TenantPolicy.is_tenant_admin(groups):23            return True24        tenant_policy = TenantPolicy.has_user_tenant_permission(25            session=session,26            username=username,27            groups=groups,28            permission_name=permission_name,29            tenant_name=tenant_name,30        )31        if not tenant_policy:32            raise exceptions.TenantUnauthorized(33                username=username,34                action=permission_name,35                tenant_name=tenant_name,36            )37        else:38            return tenant_policy39    @staticmethod40    def has_user_tenant_permission(41        session, username: str, groups: [str], tenant_name: str, permission_name: str42    ):43        if not username or not permission_name:44            return False45        tenant_policy: models.TenantPolicy = (46            session.query(models.TenantPolicy)47            .join(48                models.TenantPolicyPermission,49                models.TenantPolicy.sid == models.TenantPolicyPermission.sid,50            )51            .join(52                models.Tenant,53                models.Tenant.tenantUri == models.TenantPolicy.tenantUri,54            )55            .join(56                models.Permission,57                models.Permission.permissionUri58                == models.TenantPolicyPermission.permissionUri,59            )60            .filter(61                models.TenantPolicy.principalId.in_(groups),62                models.Permission.name == permission_name,63                models.Tenant.name == tenant_name,64            )65            .first()66        )67        return tenant_policy68    @staticmethod69    def has_group_tenant_permission(70        session, group_uri: str, tenant_name: str, permission_name: str71    ):72        if not group_uri or not permission_name:73            return False74        tenant_policy: models.TenantPolicy = (75            session.query(models.TenantPolicy)76            .join(77                models.TenantPolicyPermission,78                models.TenantPolicy.sid == models.TenantPolicyPermission.sid,79            )80            .join(81                models.Tenant,82                models.Tenant.tenantUri == models.TenantPolicy.tenantUri,83            )84            .join(85                models.Permission,86                models.Permission.permissionUri87                == models.TenantPolicyPermission.permissionUri,88            )89            .filter(90                and_(91                    models.TenantPolicy.principalId == group_uri,92                    models.Permission.name == permission_name,93                    models.Tenant.name == tenant_name,94                )95            )96            .first()97        )98        if not tenant_policy:99            return False100        else:101            return tenant_policy102    @staticmethod103    def find_tenant_policy(session, group_uri: str, tenant_name: str):104        TenantPolicy.validate_find_tenant_policy(group_uri, tenant_name)105        tenant_policy = (106            session.query(models.TenantPolicy)107            .join(108                models.Tenant, models.Tenant.tenantUri == models.TenantPolicy.tenantUri109            )110            .filter(111                and_(112                    models.TenantPolicy.principalId == group_uri,113                    models.Tenant.name == tenant_name,114                )115            )116            .first()117        )118        return tenant_policy119    @staticmethod120    def validate_find_tenant_policy(group_uri, tenant_name):121        if not group_uri:122            raise exceptions.RequiredParameter(param_name='group_uri')123        if not tenant_name:124            raise exceptions.RequiredParameter(param_name='tenant_name')125    @staticmethod126    def attach_group_tenant_policy(127        session,128        group: str,129        permissions: [str],130        tenant_name: str,131    ) -> models.TenantPolicy:132        TenantPolicy.validate_attach_tenant_policy(group, permissions, tenant_name)133        policy = TenantPolicy.save_group_tenant_policy(session, group, tenant_name)134        TenantPolicy.add_permission_to_group_tenant_policy(135            session, group, permissions, tenant_name, policy136        )137        return policy138    @staticmethod139    def validate_attach_tenant_policy(group, permissions, tenant_name):140        if not group:141            raise exceptions.RequiredParameter(param_name='group')142        if not permissions:143            raise exceptions.RequiredParameter(param_name='permissions')144        if not tenant_name:145            raise exceptions.RequiredParameter(param_name='tenant_name')146    @staticmethod147    def save_group_tenant_policy(session, group, tenant_name):148        TenantPolicy.validate_save_tenant_policy(group, tenant_name)149        policy = TenantPolicy.find_tenant_policy(session, group, tenant_name)150        if not policy:151            policy = models.TenantPolicy(152                principalId=group,153                principalType='GROUP',154                tenant=Tenant.get_tenant_by_name(session, tenant_name),155            )156            session.add(policy)157            session.commit()158        return policy159    @staticmethod160    def validate_save_tenant_policy(group, tenant_name):161        if not group:162            raise exceptions.RequiredParameter(param_name='group')163        if not tenant_name:164            raise exceptions.RequiredParameter(param_name='tenant_name')165    @staticmethod166    def add_permission_to_group_tenant_policy(167        session, group, permissions, tenant_name, policy168    ):169        TenantPolicy.validate_add_permission_to_tenant_policy_params(170            group, permissions, policy, tenant_name171        )172        for permission in permissions:173            if not TenantPolicy.has_group_tenant_permission(174                session,175                group_uri=group,176                permission_name=permission,177                tenant_name=tenant_name,178            ):179                TenantPolicy.associate_permission_to_tenant_policy(180                    session, policy, permission181                )182    @staticmethod183    def validate_add_permission_to_tenant_policy_params(184        group, permissions, policy, tenant_name185    ):186        if not group:187            raise exceptions.RequiredParameter(param_name='group')188        TenantPolicy.validate_add_permissions_params(permissions, policy, tenant_name)189    @staticmethod190    def validate_add_permissions_params(permissions, policy, tenant_name):191        if not permissions:192            raise exceptions.RequiredParameter(param_name='permissions')193        if not tenant_name:194            raise exceptions.RequiredParameter(param_name='tenant_name')195        if not policy:196            raise exceptions.RequiredParameter(param_name='policy')197    @staticmethod198    def associate_permission_to_tenant_policy(session, policy, permission):199        policy_permission = models.TenantPolicyPermission(200            sid=policy.sid,201            permissionUri=Permission.get_permission_by_name(202                session, permission, PermissionType.TENANT.name203            ).permissionUri,204        )205        session.add(policy_permission)206        session.commit()207    @staticmethod208    def get_tenant_policy_permissions(session, group_uri, tenant_name):209        if not group_uri:210            raise exceptions.RequiredParameter(param_name='group_uri')211        if not tenant_name:212            raise exceptions.RequiredParameter(param_name='tenant_name')213        policy = TenantPolicy.find_tenant_policy(214            session=session,215            group_uri=group_uri,216            tenant_name=tenant_name,217        )218        permissions = []219        for p in policy.permissions:220            permissions.append(p.permission)221        return permissions222    @staticmethod223    def delete_tenant_policy(224        session,225        group: str,226        tenant_name: str,227    ) -> bool:228        policy = TenantPolicy.find_tenant_policy(229            session, group_uri=group, tenant_name=tenant_name230        )231        if policy:232            for permission in policy.permissions:233                session.delete(permission)234            session.delete(policy)235            session.commit()236        return True237    @staticmethod238    def list_group_tenant_permissions(239        session, username, groups, uri, data=None, check_perm=None240    ):241        if not groups:242            raise exceptions.RequiredParameter('groups')243        if not uri:244            raise exceptions.RequiredParameter('groupUri')245        if not TenantPolicy.is_tenant_admin(groups):246            raise exceptions.UnauthorizedOperation(247                action='LIST_TENANT_TEAM_PERMISSIONS',248                message=f'User: {username} is not allowed to manage tenant permissions',249            )250        return TenantPolicy.get_tenant_policy_permissions(251            session=session,252            group_uri=uri,253            tenant_name='dataall',254        )255    @staticmethod256    def list_tenant_groups(session, username, groups, uri, data=None, check_perm=None):257        if not groups:258            raise exceptions.RequiredParameter('groups')259        if not TenantPolicy.is_tenant_admin(groups):260            raise exceptions.UnauthorizedOperation(261                action='LIST_TENANT_TEAMS',262                message=f'User: {username} is not allowed to manage tenant permissions',263            )264        query = session.query(265            models.TenantPolicy.principalId.label('name'),266            models.TenantPolicy.principalId.label('groupUri'),267        ).filter(268            and_(269                models.TenantPolicy.principalType == 'GROUP',270                models.TenantPolicy.principalId != 'DAAdministrators',271            )272        )273        if data and data.get('term'):274            query = query.filter(275                models.TenantPolicy.principalId.ilike('%' + data.get('term') + '%')276            )277        return paginate(278            query=query,279            page=data.get('page', 1),280            page_size=data.get('pageSize', 10),281        ).to_dict()282    @staticmethod283    def list_tenant_permissions(session, username, groups):284        if not TenantPolicy.is_tenant_admin(groups):285            raise exceptions.UnauthorizedOperation(286                action='LIST_TENANT_TEAM_PERMISSIONS',287                message=f'User: {username} is not allowed to manage tenant permissions',288            )289        group_invitation_permissions = []290        for p in permissions.TENANT_ALL:291            group_invitation_permissions.append(292                Permission.find_permission_by_name(293                    session=session,294                    permission_name=p,295                    permission_type=PermissionType.TENANT.name,296                )297            )298        return group_invitation_permissions299    @staticmethod300    def update_group_permissions(301        session, username, groups, uri, data=None, check_perm=None302    ):303        TenantPolicy.validate_params(data)304        if not TenantPolicy.is_tenant_admin(groups):305            exceptions.UnauthorizedOperation(306                action='UPDATE_TENANT_TEAM_PERMISSIONS',307                message=f'User: {username} is not allowed to manage tenant permissions',308            )309        TenantPolicy.validate_permissions(310            session, TENANT_NAME, data['permissions'], uri311        )312        TenantPolicy.delete_tenant_policy(313            session=session, group=uri, tenant_name=TENANT_NAME314        )315        TenantPolicy.attach_group_tenant_policy(316            session=session,317            group=uri,318            permissions=data['permissions'],319            tenant_name=TENANT_NAME,320        )321        return True322    @staticmethod323    def validate_permissions(session, tenant_name, g_permissions, group):324        g_permissions = list(set(g_permissions))325        if g_permissions not in permissions.TENANT_ALL:326            exceptions.TenantPermissionUnauthorized(327                action='UPDATE_TENANT_TEAM_PERMISSIONS',328                group_name=group,329                tenant_name=tenant_name,330            )331        tenant_group_permissions = []332        for p in g_permissions:333            tenant_group_permissions.append(334                Permission.find_permission_by_name(335                    session=session,336                    permission_name=p,337                    permission_type=PermissionType.TENANT.name,338                )339            )340        return tenant_group_permissions341    @staticmethod342    def validate_params(data):343        if not data:344            raise exceptions.RequiredParameter('data')345        if not data.get('permissions'):...multi_db_management.py
Source:multi_db_management.py  
1from models import Tenant2from models import db3from caching import simple_cache4from flask import current_app5MYSQL_URI = 'mysql+pymysql://user:pwd@localhost/{}?charset=utf8'6@simple_cache7def get_known_tenants():8    tenants = Tenant.query.all()9    return [i.name for i in tenants]10def prepare_bind(tenant_name):11    if tenant_name not in current_app.config['SQLALCHEMY_BINDS']:12        current_app.config['SQLALCHEMY_BINDS'][tenant_name] = MYSQL_URI.format(tenant_name)13    return current_app.config['SQLALCHEMY_BINDS'][tenant_name]14def get_tenant_session(tenant_name):15    if tenant_name not in get_known_tenants():16        return None17    prepare_bind(tenant_name)18    engine = db.get_engine(current_app, bind=tenant_name)19    session_maker = db.sessionmaker()20    session_maker.configure(bind=engine)21    session = session_maker()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
