How to use _store_arn method in localstack

Best Python code snippet using localstack_python

acme_store.py

Source:acme_store.py Github

copy

Full Screen

1# SPDX-License-Identifier: MPL-2.02# Copyright 2022 John Mille <john@ews-network.net>3"""4Manages the config-dir/accounts settings to avoid creating new accounts for every new certificate.5"""6import datetime7import json8import os9import stat10from shutil import rmtree11from tempfile import TemporaryDirectory12from urllib.parse import urlparse13from boto3.session import Session14from compose_x_common.aws import get_session15from dateutil import parser as dateparser16from certbot_aws_store.acme_config import Account, AcmeConfig17from certbot_aws_store.backends import SECRET_ARN_RE18from certbot_aws_store.utils import easy_read19class AcmeStore:20 pkey_file_name = "private_key.json"21 regr_file_name = "regr.json"22 meta_file_name = "meta.json"23 files = {24 pkey_file_name: "privateKey",25 regr_file_name: "registration",26 meta_file_name: "metadata",27 }28 accounts_dir_name: str = "accounts"29 def __init__(30 self,31 certs_store_arn: str,32 override_directory: str = None,33 session: Session = None,34 ):35 self._store_arn = certs_store_arn36 self.session = get_session(session)37 self.staging_account = None38 self.account = None39 self.backup_config = None40 if not override_directory:41 self.temp_dir = TemporaryDirectory()42 self.directory = self.temp_dir.name43 self.accounts_path = f"{self.config_dir}/{self.accounts_dir_name}"44 os.makedirs(self.accounts_path, exist_ok=True)45 else:46 self.directory = override_directory47 try:48 self.accounts_path = find_accounts_dir(override_directory)49 except OSError:50 self.accounts_path = f"{self.config_dir}/{self.accounts_dir_name}"51 if not os.path.exists(self.accounts_path):52 os.makedirs(self.accounts_path, exist_ok=True)53 self.init_pull()54 @property55 def existing_accounts(self) -> list[Account]:56 return get_acme_accounts(self.accounts_path)57 @property58 def config_dir(self) -> str:59 return f"{self.directory}/config-dir"60 @property61 def logs_dir(self) -> str:62 return f"{self.directory}/logs-dir"63 @property64 def work_dir(self) -> str:65 return f"{self.directory}/work-dir"66 @property67 def config(self) -> AcmeConfig:68 accounts = get_acme_accounts(self.accounts_path)69 return AcmeConfig(accounts=accounts)70 def init_pull(71 self,72 ):73 client = self.session.client("secretsmanager")74 try:75 config_r = client.get_secret_value(SecretId=self.secret_name)76 config_content = json.loads(config_r["SecretString"])77 config_content["store_arn"] = config_r["ARN"]78 self._store_arn = config_r["ARN"]79 config = AcmeConfig(**config_content)80 self.layout_accounts_folders(config)81 except client.exceptions.ResourceNotFoundException:82 print(83 f"Secret {self._store_arn} does not exist. Will initialize after first certificate"84 )85 except Exception as error:86 print(error)87 raise88 self.set_execution_accounts()89 def layout_accounts_folders(self, config: AcmeConfig):90 for endpoint, accounts in accounts_per_endpoints(config).items():91 directory_path = create_account_endpoint_dirs(self.accounts_path, endpoint)92 for account in accounts:93 _account_path = f"{directory_path}/{account.dirname}"94 os.makedirs(_account_path, exist_ok=True)95 write_accounts_files(_account_path, account)96 self.backup_config = AcmeConfig(accounts=get_acme_accounts(self.accounts_path))97 def set_execution_accounts(self):98 now = datetime.datetime.now(tz=None)99 for endpoint, accounts in accounts_per_endpoints(self.config).items():100 latest_account = accounts[0]101 latest_account_path = (102 f"{self.config_dir}/{self.accounts_dir_name}"103 f"/{endpoint}/directory/{accounts[0].dirname}"104 )105 diff = now - get_meta_creation_dt(latest_account)106 to_clear: list = []107 for account in accounts:108 _account_path = (109 f"{self.config_dir}/{self.accounts_dir_name}"110 f"/{endpoint}/directory/{account.dirname}"111 )112 to_clear.append(_account_path)113 created_on = get_meta_creation_dt(account)114 if not diff or (diff and diff > (now - created_on)):115 diff = now - created_on116 latest_account = account117 latest_account_path = _account_path118 set_latest_endpoint_account(119 endpoint, latest_account, latest_account_path, to_clear120 )121 if latest_account.is_staging():122 self.staging_account = latest_account123 else:124 self.account = latest_account125 @property126 def secret_name(self) -> str:127 if SECRET_ARN_RE.match(self._store_arn):128 return SECRET_ARN_RE.match(self._store_arn).group("name")129 else:130 return self._store_arn131 def save(self):132 client = self.session.client("secretsmanager")133 secret_value = AcmeConfig(134 accounts=self.merge_used_accounts_with_backup()135 ).json()136 print(137 "Saving accounts",138 [139 (_act.account_id, _act.endpoint)140 for _act in self.merge_used_accounts_with_backup()141 ],142 )143 try:144 client.put_secret_value(SecretId=self._store_arn, SecretString=secret_value)145 except client.exceptions.ResourceNotFoundException:146 client.create_secret(147 Name=self.secret_name,148 SecretString=secret_value,149 Tags=[150 {"Key": "Name", "Value": self.secret_name},151 {"Key": "certbot_route53_store", "Value": str(True)},152 ],153 )154 except Exception as error:155 print(error)156 def merge_used_accounts_with_backup(self) -> list[Account]:157 if not self.backup_config or not isinstance(self.backup_config, AcmeConfig):158 return self.config.accounts159 at_del_accounts = self.config.accounts160 to_add: list = []161 for account in at_del_accounts:162 for _bkp_account in self.backup_config.accounts:163 if account.endpoint != _bkp_account.endpoint:164 continue165 if account.account_id == _bkp_account.account_id:166 continue167 to_add.append(_bkp_account)168 at_del_accounts += to_add169 return at_del_accounts170def create_account_endpoint_dirs(accounts_path: str, endpoint: str) -> str:171 endpoint_path = f"{accounts_path}/{endpoint}"172 if not os.path.exists(endpoint_path):173 os.makedirs(endpoint_path, exist_ok=True)174 os.chmod(f"{endpoint_path}", stat.S_IWUSR | stat.S_IRUSR | stat.S_IXUSR)175 directory_path = f"{endpoint_path}/directory"176 if not os.path.exists(directory_path):177 os.makedirs(directory_path, exist_ok=True)178 os.chmod(f"{directory_path}", stat.S_IWUSR | stat.S_IRUSR | stat.S_IXUSR)179 return directory_path180def set_latest_endpoint_account(181 endpoint: str, account: Account, account_path: str, to_clear: list182):183 print(f"Latest {endpoint} account: {account.dirname} - {account.account_id}")184 for _dir_to_remove in to_clear:185 if _dir_to_remove == account_path:186 continue187 rmtree(_dir_to_remove)188 if not os.path.exists(account_path):189 os.makedirs(account_path, exist_ok=True)190 elif os.path.exists(account_path) and not os.path.isdir(account_path):191 raise OSError(account_path, "exists but is not a directory")192def write_accounts_files(account_path: str, account: Account) -> None:193 """194 Creates the account folders, writes down the private_key.json and regr.json195 """196 print("Import for account", account.account_id, account.endpoint, account_path)197 private_key_path = f"{account_path}/{AcmeStore.pkey_file_name}"198 try:199 with open(private_key_path) as fd:200 key_content = fd.read()201 if set(key_content) == set(account.privateKey):202 print(203 private_key_path, "already exists and is identical. Nothing to do"204 )205 except OSError:206 with open(private_key_path, "w") as fd:207 if isinstance(account.privateKey, str):208 fd.write(account.privateKey)209 elif isinstance(account.privateKey, dict):210 fd.write(json.dumps(account.privateKey))211 os.chmod(private_key_path, stat.S_IRUSR)212 regr_key_path = f"{account_path}/{AcmeStore.regr_file_name}"213 try:214 with open(regr_key_path) as fd:215 key_content = fd.read()216 if set(key_content) == set(account.registration):217 print(regr_key_path, "already exists and is identical. Nothing to do")218 except OSError:219 with open(regr_key_path, "w") as fd:220 if isinstance(account.registration, str):221 fd.write(account.registration)222 elif isinstance(account.registration, dict):223 fd.write(json.dumps(account.registration))224 os.chmod(225 regr_key_path,226 stat.S_IWRITE | stat.S_IRUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH,227 )228 meta_key_path = f"{account_path}/{AcmeStore.meta_file_name}"229 try:230 with open(meta_key_path) as fd:231 key_content = fd.read()232 if set(key_content) == set(account.meta):233 print(meta_key_path, "already exists and is identical. Nothing to do")234 except OSError:235 with open(meta_key_path, "w") as fd:236 if isinstance(account.meta, str):237 fd.write(account.meta)238 elif isinstance(account.meta, dict):239 fd.write(json.dumps(account.meta))240 os.chmod(241 meta_key_path,242 stat.S_IWRITE | stat.S_IRUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH,243 )244def find_accounts_dir(certs_root_path, accounts_folder_name: str = "accounts") -> str:245 """246 For a given path, returns the path to ``accounts`` folder.247 :raises: IOError if ``accounts_folder_name`` not found in tree248 """249 for root, dirs, files in os.walk(certs_root_path):250 for _dir in dirs:251 if _dir == accounts_folder_name:252 dir_path = os.path.join(root, _dir)253 if not os.path.isdir(dir_path):254 raise TypeError(dir_path, "is not a directory, yet exists.")255 return dir_path256 raise OSError(257 f"No folder named {accounts_folder_name} found in {certs_root_path} tree"258 )259def get_acme_accounts(260 accounts_path: str,261) -> list[Account]:262 """263 :param str accounts_path:264 :return: list of accounts265 :rtype: list[Account]266 """267 accounts: list = []268 for root, dirs, files in os.walk(accounts_path):269 if AcmeStore.pkey_file_name not in files:270 continue271 for file in files:272 if file == AcmeStore.pkey_file_name:273 private_key_path = os.path.join(root, file)274 top_dir = os.path.split(private_key_path)[0]275 top_path = os.path.normpath(top_dir)276 reg_key_path = os.path.join(top_dir, AcmeStore.regr_file_name)277 meta_key_path = os.path.join(top_dir, AcmeStore.meta_file_name)278 account_parts = top_path.split(os.sep)[3:]279 reg_key_content = easy_read(reg_key_path)280 meta_content = easy_read(meta_key_path)281 account_uri = json.loads(reg_key_content)["uri"]282 acme_account = urlparse(account_uri)283 acme_account_id = os.path.split(acme_account.path)[-1]284 account = Account(285 account_id=acme_account_id,286 privateKey=easy_read(private_key_path),287 registration=reg_key_content,288 meta=meta_content,289 created_on=datetime.datetime.replace(290 dateparser.parse(json.loads(meta_content)["creation_dt"]),291 tzinfo=None,292 ).isoformat(),293 dirname=account_parts[-1],294 endpoint=acme_account.netloc,295 )296 accounts.append(account)297 return accounts298def get_meta_creation_dt(account: Account) -> datetime.datetime:299 if isinstance(account.meta, str):300 metadata = json.loads(account.meta)301 created_on = datetime.datetime.replace(302 dateparser.parse(metadata["creation_dt"]), tzinfo=None303 )304 elif isinstance(account.meta, dict):305 created_on = datetime.datetime.replace(306 dateparser.parse(account.meta["creation_dt"]), tzinfo=None307 )308 else:309 raise TypeError(310 "account.meta is not valid type. Got",311 type(account.meta),312 "expected one of",313 (str, dict),314 )315 return created_on316def accounts_per_endpoints(config: AcmeConfig) -> dict[str, list[Account]]:317 endpoints_to_accounts: dict = {}318 for account in config.accounts:319 if account.endpoint not in endpoints_to_accounts.keys():320 endpoints_to_accounts[account.endpoint]: list = [account]321 else:322 endpoints_to_accounts[account.endpoint].append(account)...

Full Screen

Full Screen

kinesis.py

Source:kinesis.py Github

copy

Full Screen

...41 @staticmethod42 def get_deploy_templates():43 def get_delete_params(params, **kwargs):44 return {"StreamName": params["Name"], "EnforceConsumerDeletion": True}45 def _store_arn(result, resource_id, resources, resource_type):46 client = aws_stack.connect_to_service("kinesis")47 stream_name = resources[resource_id]["Properties"]["Name"]48 description = client.describe_stream(StreamName=stream_name)49 while description["StreamDescription"]["StreamStatus"] != "ACTIVE":50 description = client.describe_stream(StreamName=stream_name)51 resources[resource_id]["PhysicalResourceId"] = description["StreamDescription"][52 "StreamARN"53 ]54 return {55 "create": {56 "function": "create_stream",57 "parameters": {"StreamName": "Name", "ShardCount": "ShardCount"},58 "defaults": {"ShardCount": 1},59 "result_handler": _store_arn,...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful