Best Python code snippet using localstack_python
gcp.py
Source:gcp.py  
...130    Enable instance inspection access for the given application.131    """132    log("Enabling instance inspection for {}", application_name)133    service_account = _get_service_account(model_uuid, application_name)134    _add_roles(service_account, ["roles/compute.viewer"])135def enable_network_management(model_uuid, application_name):136    """137    Enable network management for the given application.138    """139    log("Enabling network management for {}", application_name)140    service_account = _get_service_account(model_uuid, application_name)141    _add_roles(service_account, ["roles/compute.networkAdmin"])142def enable_security_management(model_uuid, application_name):143    """144    Enable security management for the given application.145    """146    log("Enabling security management for {}", application_name)147    service_account = _get_service_account(model_uuid, application_name)148    _add_roles(service_account, ["roles/compute.securityAdmin"])149def enable_block_storage_management(model_uuid, application_name):150    """151    Enable block storage (disk) management for the given application.152    """153    log("Enabling block storage management for {}", application_name)154    service_account = _get_service_account(model_uuid, application_name)155    _ensure_custom_role(156        name="compute.instanceStorageAdmin",157        title="Storage admin for instances",158        description="Attach and remove disks to instances",159        permissions=["compute.instances.attachDisk", "compute.instances.detachDisk"],160    )161    _add_roles(162        service_account,163        [164            "roles/compute.storageAdmin",165            "projects/{}/roles/compute.instanceStorageAdmin".format(PROJECT),166        ],167    )168def enable_dns_management(model_uuid, application_name):169    """170    Enable DNS management for the given application.171    """172    log("Enabling DNS management for {}", application_name)173    service_account = _get_service_account(model_uuid, application_name)174    _add_roles(service_account, ["roles/dns.admin"])175def enable_object_storage_access(model_uuid, application_name):176    """177    Enable object storage read-only access for the given application.178    """179    log("Enabling object storage read for {}", application_name)180    service_account = _get_service_account(model_uuid, application_name)181    _add_roles(service_account, ["roles/storage.objectViewer"])182def enable_object_storage_management(model_uuid, application_name):183    """184    Enable object storage management for the given application.185    """186    log("Enabling object store management for {}", application_name)187    service_account = _get_service_account(model_uuid, application_name)188    _add_roles(service_account, ["roles/storage.objectAdmin"])189def cleanup(relation_ids):190    """191    Cleanup unused account keys.192    """193    account_keys = kv().getrange("charm.gcp.account-keys.", strip=True)194    broken_relations = account_keys.keys() - relation_ids195    removed = []196    for relation_id in broken_relations:197        key = account_keys[relation_id]198        _gcloud(199            "iam",200            "service-accounts",201            "keys",202            "delete",203            "--iam-account",204            key["service-account"],205            key["id"],206        )207        log(208            "Deleted unused key {} for service account {}",209            key["id"],210            key["service-account"],211        )212        removed.append(relation_id)213    # TODO: purge no-longer used SAs and clean up project policy214    kv().unsetrange(removed, prefix="charm.gcp.account-keys.")215# Internal helpers216class GCPError(Exception):217    """218    Exception class representing an error returned from the gcloud tool.219    """220    pass221def _elide(s, max_len, ellipsis="..."):222    """223    Elide s in the middle to ensure it is under max_len.224    That is, shorten the string, inserting an ellipsis where the removed225    characters were to show that they've been removed.226    """227    if len(s) > max_len:228        hl = (max_len - len(ellipsis)) / 2229        headl, taill = floor(hl), ceil(hl)230        s = s[:headl] + ellipsis + s[-taill:]231    return s232def _gcloud(cmd, subcmd, *args, return_stderr=False):233    """234    Call the gcloud tool.235    """236    cmd = ["gcloud", "--quiet", "--format=json", cmd, subcmd]237    cmd.extend(args)238    result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)239    stdout = result.stdout.decode("utf8").strip()240    stderr = result.stderr.decode("utf8").strip()241    if result.returncode != 0:242        raise GCPError(stderr)243    if return_stderr:244        # sometime gcloud is dumb about what it returns as the structured245        # output, forcing us to parse the unstructured stderr message246        return stderr247    if stdout:248        stdout = json.loads(stdout)249    return stdout250def _get_service_account(model_uuid, application_name):251    """252    Get or create the service account associated with the charm.253    """254    sa_cache_key = "charm.gcp.service_accounts"255    app_name = _elide(application_name.lower(), 14, "--")256    sa_name = "juju-gcp-{}-{}".format(app_name, model_uuid[-6:])257    service_accounts = kv().get(sa_cache_key, {})258    if sa_name in service_accounts:259        return service_accounts[sa_name]260    cloud_service_accounts = _gcloud("iam", "service-accounts", "list")261    service_accounts.update(262        {sa["email"].split("@")[0]: sa["email"] for sa in cloud_service_accounts}263    )264    kv().set(sa_cache_key, service_accounts)265    if sa_name in service_accounts:266        return service_accounts[sa_name]267    sa = _gcloud("iam", "service-accounts", "create", sa_name)268    service_account = sa["email"]269    service_accounts[sa_name] = service_account270    log("Created service account for {}: {}", application_name, service_account)271    kv().set(sa_cache_key, service_accounts)272    _add_roles(273        service_account,274        ["roles/iam.serviceAccountUser", "roles/iam.serviceAccountTokenCreator"],275    )276    return service_account277def _ensure_custom_role(name, title, description, permissions):278    roles = {279        role["name"].split("/")[-1]280        for role in _gcloud("iam", "roles", "list", "--project", PROJECT)281    }282    if name in roles:283        return284    _gcloud(285        "iam",286        "roles",287        "create",288        "--project",289        PROJECT,290        name,291        "--title",292        title,293        "--description",294        description,295        "--permissions",296        ",".join(permissions),297    )298    log("Created custom role {}", name)299def _add_roles(service_account, roles):300    for role in roles:301        _gcloud(302            "projects",303            "add-iam-policy-binding",304            PROJECT,305            "--member",306            "serviceAccount:{}".format(service_account),307            "--role",308            role,309        )...sql.py
Source:sql.py  
...55            session.flush()56        trust_dict = ref.to_dict()57        trust_dict['roles'] = added_roles58        return trust_dict59    def _add_roles(self, trust_id, session, trust_dict):60        roles = []61        for role in session.query(TrustRole).filter_by(trust_id=trust_id):62            roles.append({'id': role.role_id})63        trust_dict['roles'] = roles64    @sql.handle_conflicts(type='trust')65    def get_trust(self, trust_id):66        session = self.get_session()67        ref = (session.query(TrustModel).68               filter_by(deleted_at=None).69               filter_by(id=trust_id).first())70        if ref is None:71            return None72        if ref.expires_at is not None:73            now = timeutils.utcnow()74            if now > ref.expires_at:75                return None76        trust_dict = ref.to_dict()77        self._add_roles(trust_id, session, trust_dict)78        return trust_dict79    @sql.handle_conflicts(type='trust')80    def list_trusts(self):81        session = self.get_session()82        trusts = session.query(TrustModel).filter_by(deleted_at=None)83        return [trust_ref.to_dict() for trust_ref in trusts]84    @sql.handle_conflicts(type='trust')85    def list_trusts_for_trustee(self, trustee_user_id):86        session = self.get_session()87        trusts = (session.query(TrustModel).88                  filter_by(deleted_at=None).89                  filter_by(trustee_user_id=trustee_user_id))90        return [trust_ref.to_dict() for trust_ref in trusts]91    @sql.handle_conflicts(type='trust')...bt2sphinxurl.py
Source:bt2sphinxurl.py  
...77    url = m.group(2).replace("@ver@", bt2_version)78    # create and return an external link node79    node = docutils.nodes.reference(rawtext, link_text, internal=False, refuri=url)80    return [node], []81def _add_roles(app):82    # add the extension's roles; the role functions above expect the83    # project's version as their first parameter84    app.add_role("bt2man", functools.partial(_bt2man_role, app.config.version))85    app.add_role("bt2link", functools.partial(_bt2link_role, app.config.version))86def setup(app):87    app.connect("builder-inited", _add_roles)88    return {89        "version": app.config.version,90        "parallel_read_safe": True,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
