Best Python code snippet using localstack_python
acme_store.py
Source:acme_store.py  
1# SPDX-License-Identifier: MPL-2.02# Copyright 2022 John Mille <john@ews-network.net>3"""4Manages the config-dir/accounts settings to avoid creating new accounts for every new certificate.5"""6import datetime7import json8import os9import stat10from shutil import rmtree11from tempfile import TemporaryDirectory12from urllib.parse import urlparse13from boto3.session import Session14from compose_x_common.aws import get_session15from dateutil import parser as dateparser16from certbot_aws_store.acme_config import Account, AcmeConfig17from certbot_aws_store.backends import SECRET_ARN_RE18from certbot_aws_store.utils import easy_read19class AcmeStore:20    pkey_file_name = "private_key.json"21    regr_file_name = "regr.json"22    meta_file_name = "meta.json"23    files = {24        pkey_file_name: "privateKey",25        regr_file_name: "registration",26        meta_file_name: "metadata",27    }28    accounts_dir_name: str = "accounts"29    def __init__(30        self,31        certs_store_arn: str,32        override_directory: str = None,33        session: Session = None,34    ):35        self._store_arn = certs_store_arn36        self.session = get_session(session)37        self.staging_account = None38        self.account = None39        self.backup_config = None40        if not override_directory:41            self.temp_dir = TemporaryDirectory()42            self.directory = self.temp_dir.name43            self.accounts_path = f"{self.config_dir}/{self.accounts_dir_name}"44            os.makedirs(self.accounts_path, exist_ok=True)45        else:46            self.directory = override_directory47            try:48                self.accounts_path = find_accounts_dir(override_directory)49            except OSError:50                self.accounts_path = f"{self.config_dir}/{self.accounts_dir_name}"51                if not os.path.exists(self.accounts_path):52                    os.makedirs(self.accounts_path, exist_ok=True)53        self.init_pull()54    @property55    def existing_accounts(self) -> list[Account]:56        return get_acme_accounts(self.accounts_path)57    @property58    def config_dir(self) -> str:59        return f"{self.directory}/config-dir"60    @property61    def logs_dir(self) -> str:62        return f"{self.directory}/logs-dir"63    @property64    def work_dir(self) -> str:65        return f"{self.directory}/work-dir"66    @property67    def config(self) -> AcmeConfig:68        accounts = get_acme_accounts(self.accounts_path)69        return AcmeConfig(accounts=accounts)70    def init_pull(71        self,72    ):73        client = self.session.client("secretsmanager")74        try:75            config_r = client.get_secret_value(SecretId=self.secret_name)76            config_content = json.loads(config_r["SecretString"])77            config_content["store_arn"] = config_r["ARN"]78            self._store_arn = config_r["ARN"]79            config = AcmeConfig(**config_content)80            self.layout_accounts_folders(config)81        except client.exceptions.ResourceNotFoundException:82            print(83                f"Secret {self._store_arn} does not exist. Will initialize after first certificate"84            )85        except Exception as error:86            print(error)87            raise88        self.set_execution_accounts()89    def layout_accounts_folders(self, config: AcmeConfig):90        for endpoint, accounts in accounts_per_endpoints(config).items():91            directory_path = create_account_endpoint_dirs(self.accounts_path, endpoint)92            for account in accounts:93                _account_path = f"{directory_path}/{account.dirname}"94                os.makedirs(_account_path, exist_ok=True)95                write_accounts_files(_account_path, account)96        self.backup_config = AcmeConfig(accounts=get_acme_accounts(self.accounts_path))97    def set_execution_accounts(self):98        now = datetime.datetime.now(tz=None)99        for endpoint, accounts in accounts_per_endpoints(self.config).items():100            latest_account = accounts[0]101            latest_account_path = (102                f"{self.config_dir}/{self.accounts_dir_name}"103                f"/{endpoint}/directory/{accounts[0].dirname}"104            )105            diff = now - get_meta_creation_dt(latest_account)106            to_clear: list = []107            for account in accounts:108                _account_path = (109                    f"{self.config_dir}/{self.accounts_dir_name}"110                    f"/{endpoint}/directory/{account.dirname}"111                )112                to_clear.append(_account_path)113                created_on = get_meta_creation_dt(account)114                if not diff or (diff and diff > (now - created_on)):115                    diff = now - created_on116                    latest_account = account117                    latest_account_path = _account_path118            set_latest_endpoint_account(119                endpoint, latest_account, latest_account_path, to_clear120            )121            if latest_account.is_staging():122                self.staging_account = latest_account123            else:124                self.account = latest_account125    @property126    def secret_name(self) -> str:127        if SECRET_ARN_RE.match(self._store_arn):128            return SECRET_ARN_RE.match(self._store_arn).group("name")129        else:130            return self._store_arn131    def save(self):132        client = self.session.client("secretsmanager")133        secret_value = AcmeConfig(134            accounts=self.merge_used_accounts_with_backup()135        ).json()136        print(137            "Saving accounts",138            [139                (_act.account_id, _act.endpoint)140                for _act in self.merge_used_accounts_with_backup()141            ],142        )143        try:144            client.put_secret_value(SecretId=self._store_arn, SecretString=secret_value)145        except client.exceptions.ResourceNotFoundException:146            client.create_secret(147                Name=self.secret_name,148                SecretString=secret_value,149                Tags=[150                    {"Key": "Name", "Value": self.secret_name},151                    {"Key": "certbot_route53_store", "Value": str(True)},152                ],153            )154        except Exception as error:155            print(error)156    def merge_used_accounts_with_backup(self) -> list[Account]:157        if not self.backup_config or not isinstance(self.backup_config, AcmeConfig):158            return self.config.accounts159        at_del_accounts = self.config.accounts160        to_add: list = []161        for account in at_del_accounts:162            for _bkp_account in self.backup_config.accounts:163                if account.endpoint != _bkp_account.endpoint:164                    continue165                if account.account_id == _bkp_account.account_id:166                    continue167                to_add.append(_bkp_account)168        at_del_accounts += to_add169        return at_del_accounts170def create_account_endpoint_dirs(accounts_path: str, endpoint: str) -> str:171    endpoint_path = f"{accounts_path}/{endpoint}"172    if not os.path.exists(endpoint_path):173        os.makedirs(endpoint_path, exist_ok=True)174    os.chmod(f"{endpoint_path}", stat.S_IWUSR | stat.S_IRUSR | stat.S_IXUSR)175    directory_path = f"{endpoint_path}/directory"176    if not os.path.exists(directory_path):177        os.makedirs(directory_path, exist_ok=True)178    os.chmod(f"{directory_path}", stat.S_IWUSR | stat.S_IRUSR | stat.S_IXUSR)179    return directory_path180def set_latest_endpoint_account(181    endpoint: str, account: Account, account_path: str, to_clear: list182):183    print(f"Latest {endpoint} account: {account.dirname} - {account.account_id}")184    for _dir_to_remove in to_clear:185        if _dir_to_remove == account_path:186            continue187        rmtree(_dir_to_remove)188    if not os.path.exists(account_path):189        os.makedirs(account_path, exist_ok=True)190    elif os.path.exists(account_path) and not os.path.isdir(account_path):191        raise OSError(account_path, "exists but is not a directory")192def write_accounts_files(account_path: str, account: Account) -> None:193    """194    Creates the account folders, writes down the private_key.json and regr.json195    """196    print("Import for account", account.account_id, account.endpoint, account_path)197    private_key_path = f"{account_path}/{AcmeStore.pkey_file_name}"198    try:199        with open(private_key_path) as fd:200            key_content = fd.read()201            if set(key_content) == set(account.privateKey):202                print(203                    private_key_path, "already exists and is identical. Nothing to do"204                )205    except OSError:206        with open(private_key_path, "w") as fd:207            if isinstance(account.privateKey, str):208                fd.write(account.privateKey)209            elif isinstance(account.privateKey, dict):210                fd.write(json.dumps(account.privateKey))211        os.chmod(private_key_path, stat.S_IRUSR)212    regr_key_path = f"{account_path}/{AcmeStore.regr_file_name}"213    try:214        with open(regr_key_path) as fd:215            key_content = fd.read()216            if set(key_content) == set(account.registration):217                print(regr_key_path, "already exists and is identical. Nothing to do")218    except OSError:219        with open(regr_key_path, "w") as fd:220            if isinstance(account.registration, str):221                fd.write(account.registration)222            elif isinstance(account.registration, dict):223                fd.write(json.dumps(account.registration))224        os.chmod(225            regr_key_path,226            stat.S_IWRITE | stat.S_IRUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH,227        )228    meta_key_path = f"{account_path}/{AcmeStore.meta_file_name}"229    try:230        with open(meta_key_path) as fd:231            key_content = fd.read()232            if set(key_content) == set(account.meta):233                print(meta_key_path, "already exists and is identical. Nothing to do")234    except OSError:235        with open(meta_key_path, "w") as fd:236            if isinstance(account.meta, str):237                fd.write(account.meta)238            elif isinstance(account.meta, dict):239                fd.write(json.dumps(account.meta))240        os.chmod(241            meta_key_path,242            stat.S_IWRITE | stat.S_IRUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH,243        )244def find_accounts_dir(certs_root_path, accounts_folder_name: str = "accounts") -> str:245    """246    For a given path, returns the path to ``accounts`` folder.247    :raises: IOError if ``accounts_folder_name`` not found in tree248    """249    for root, dirs, files in os.walk(certs_root_path):250        for _dir in dirs:251            if _dir == accounts_folder_name:252                dir_path = os.path.join(root, _dir)253                if not os.path.isdir(dir_path):254                    raise TypeError(dir_path, "is not a directory, yet exists.")255                return dir_path256    raise OSError(257        f"No folder named {accounts_folder_name} found in {certs_root_path} tree"258    )259def get_acme_accounts(260    accounts_path: str,261) -> list[Account]:262    """263    :param str accounts_path:264    :return: list of accounts265    :rtype: list[Account]266    """267    accounts: list = []268    for root, dirs, files in os.walk(accounts_path):269        if AcmeStore.pkey_file_name not in files:270            continue271        for file in files:272            if file == AcmeStore.pkey_file_name:273                private_key_path = os.path.join(root, file)274                top_dir = os.path.split(private_key_path)[0]275                top_path = os.path.normpath(top_dir)276                reg_key_path = os.path.join(top_dir, AcmeStore.regr_file_name)277                meta_key_path = os.path.join(top_dir, AcmeStore.meta_file_name)278                account_parts = top_path.split(os.sep)[3:]279                reg_key_content = easy_read(reg_key_path)280                meta_content = easy_read(meta_key_path)281                account_uri = json.loads(reg_key_content)["uri"]282                acme_account = urlparse(account_uri)283                acme_account_id = os.path.split(acme_account.path)[-1]284                account = Account(285                    account_id=acme_account_id,286                    privateKey=easy_read(private_key_path),287                    registration=reg_key_content,288                    meta=meta_content,289                    created_on=datetime.datetime.replace(290                        dateparser.parse(json.loads(meta_content)["creation_dt"]),291                        tzinfo=None,292                    ).isoformat(),293                    dirname=account_parts[-1],294                    endpoint=acme_account.netloc,295                )296                accounts.append(account)297    return accounts298def get_meta_creation_dt(account: Account) -> datetime.datetime:299    if isinstance(account.meta, str):300        metadata = json.loads(account.meta)301        created_on = datetime.datetime.replace(302            dateparser.parse(metadata["creation_dt"]), tzinfo=None303        )304    elif isinstance(account.meta, dict):305        created_on = datetime.datetime.replace(306            dateparser.parse(account.meta["creation_dt"]), tzinfo=None307        )308    else:309        raise TypeError(310            "account.meta is not valid type. Got",311            type(account.meta),312            "expected one of",313            (str, dict),314        )315    return created_on316def accounts_per_endpoints(config: AcmeConfig) -> dict[str, list[Account]]:317    endpoints_to_accounts: dict = {}318    for account in config.accounts:319        if account.endpoint not in endpoints_to_accounts.keys():320            endpoints_to_accounts[account.endpoint]: list = [account]321        else:322            endpoints_to_accounts[account.endpoint].append(account)...kinesis.py
Source:kinesis.py  
...41    @staticmethod42    def get_deploy_templates():43        def get_delete_params(params, **kwargs):44            return {"StreamName": params["Name"], "EnforceConsumerDeletion": True}45        def _store_arn(result, resource_id, resources, resource_type):46            client = aws_stack.connect_to_service("kinesis")47            stream_name = resources[resource_id]["Properties"]["Name"]48            description = client.describe_stream(StreamName=stream_name)49            while description["StreamDescription"]["StreamStatus"] != "ACTIVE":50                description = client.describe_stream(StreamName=stream_name)51            resources[resource_id]["PhysicalResourceId"] = description["StreamDescription"][52                "StreamARN"53            ]54        return {55            "create": {56                "function": "create_stream",57                "parameters": {"StreamName": "Name", "ShardCount": "ShardCount"},58                "defaults": {"ShardCount": 1},59                "result_handler": _store_arn,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
