How to use dynamodb_stream_arn method in localstack

Best Python code snippet using localstack_python

resource.py

Source:resource.py Github

copy

Full Screen

1from typing import Optional2import boto33from aws_cdk.aws_dynamodb import Table4from aws_cdk.aws_iam import Effect, PolicyDocument, PolicyStatement, Role, ServicePrincipal5from aws_cdk.aws_kms import Key6from aws_cdk.aws_lambda import Code, Function, Runtime, SingletonFunction, StartingPosition7from aws_cdk.aws_lambda_event_sources import DynamoEventSource8from aws_cdk.aws_logs import RetentionDays9from aws_cdk.core import Construct, CustomResource, Duration, RemovalPolicy10from b_cfn_elasticsearch_index.resource import ElasticsearchIndexResource11from b_elasticsearch_layer.layer import Layer as BElasticsearchLayer12from b_cfn_elasticsearch_cloner.cloner_source import root as cloner_root13from b_cfn_elasticsearch_cloner.initial_cloner_source import root as initial_cloner_root14class ElasticsearchCloner(Construct):15 """16 Custom resource used for managing an Elasticsearch cloner.17 Creates a cloner and clones existing data on stack creation.18 Updates the cloner on settings change.19 Deletes the cloner on stack deletion.20 :param sagemaker_endpoint_name: The name of Sagemaker inference endpoint used to provide NLP features.21 :param sagemaker_endpoint_arn: Optional. SageMaker inference endpoint ARN. By default it is resolved22 automatically with via ``sagemaker_endpoint_name`` parameter.23 :param sagemaker_embeddings_key: Key under which embeddings are received in Sagemaker response.24 """25 def __init__(26 self,27 scope: Construct,28 id: str,29 elasticsearch_index: ElasticsearchIndexResource,30 dynamodb_table: Table,31 kms_key: Optional[Key] = None,32 *,33 sagemaker_endpoint_name: str = None,34 sagemaker_endpoint_arn: str = None,35 sagemaker_embeddings_key: str = None36 ) -> None:37 super().__init__(scope=scope, id=id)38 elasticsearch_layer = BElasticsearchLayer(scope=self, name=f"{id}ElasticsearchLayer")39 if bool(sagemaker_endpoint_name) ^ bool(sagemaker_embeddings_key):40 raise ValueError(41 f'In order to use sentence embedding, all of the following enviroment variables are required: '42 f'SAGEMAKER_ENDPOINT_NAME, SAGEMAKER_EMBEDDINGS_KEY. '43 f'Else, provide none of above.'44 )45 if sagemaker_endpoint_name and not sagemaker_endpoint_arn:46 sagemaker_endpoint_arn = self.__resolve_sagemaker_endpoints_arn('*')47 optional_sagemaker_parameters = {48 'SAGEMAKER_ENDPOINT_NAME': sagemaker_endpoint_name or None,49 'SAGEMAKER_EMBEDDINGS_KEY': sagemaker_embeddings_key or None50 }51 initial_cloner_function = SingletonFunction(52 scope=self,53 id='InitialClonerFunction',54 uuid='e01116a4-f939-43f2-8f5b-cc9f862c9e01',55 lambda_purpose='InitialClonerSingletonLambda',56 code=Code.from_asset(initial_cloner_root),57 handler='index.handler',58 runtime=Runtime.PYTHON_3_8,59 layers=[elasticsearch_layer],60 log_retention=RetentionDays.ONE_MONTH,61 memory_size=128,62 timeout=Duration.minutes(15),63 role=Role(64 scope=self,65 id='InitialClonerFunctionRole',66 assumed_by=ServicePrincipal('lambda.amazonaws.com'),67 inline_policies={68 'LogsPolicy': PolicyDocument(69 statements=[70 PolicyStatement(71 actions=[72 'logs:CreateLogGroup',73 'logs:CreateLogStream',74 'logs:PutLogEvents',75 'logs:DescribeLogStreams',76 ],77 resources=['arn:aws:logs:*:*:*'],78 effect=Effect.ALLOW,79 )80 ]81 ),82 'ElasticsearchPolicy': PolicyDocument(83 statements=[84 PolicyStatement(85 actions=[86 'es:ESHttpDelete',87 'es:ESHttpGet',88 'es:ESHttpHead',89 'es:ESHttpPatch',90 'es:ESHttpPost',91 'es:ESHttpPut',92 ],93 resources=['*'],94 effect=Effect.ALLOW,95 )96 ]97 ),98 'DynamodbPolicy': PolicyDocument(99 statements=[100 PolicyStatement(101 actions=['dynamodb:*'],102 resources=['*'],103 effect=Effect.ALLOW,104 )105 ]106 ),107 },108 description='Role for DynamoDB Initial Cloner Function',109 ),110 )111 if kms_key:112 initial_cloner_function.add_to_role_policy(113 PolicyStatement(114 actions=['kms:Decrypt'],115 resources=[kms_key.key_arn],116 effect=Effect.ALLOW,117 ),118 )119 initial_cloner = CustomResource(120 scope=self,121 id='InitialCloner',122 service_token=initial_cloner_function.function_arn,123 removal_policy=RemovalPolicy.DESTROY,124 properties={125 'DynamodbTableName': dynamodb_table.table_name,126 'ElasticsearchIndexName': elasticsearch_index.index_name,127 'ElasticsearchEndpoint': elasticsearch_index.elasticsearch_domain.domain_endpoint,128 },129 resource_type='Custom::ElasticsearchInitialCloner',130 )131 primary_key_field = initial_cloner.get_att_string('PrimaryKeyField')132 dynamodb_stream_arn = dynamodb_table.table_stream_arn133 if not dynamodb_stream_arn:134 raise Exception('DynamoDB streams must be enabled for the table')135 dynamodb_event_source = DynamoEventSource(136 table=dynamodb_table,137 starting_position=StartingPosition.LATEST,138 enabled=True,139 max_batching_window=Duration.seconds(10),140 bisect_batch_on_error=True,141 parallelization_factor=2,142 batch_size=1000,143 retry_attempts=10,144 )145 cloner_inline_policies = {146 'LogsPolicy': PolicyDocument(147 statements=[148 PolicyStatement(149 actions=[150 'logs:CreateLogGroup',151 'logs:CreateLogStream',152 'logs:PutLogEvents',153 'logs:DescribeLogStreams',154 ],155 resources=['arn:aws:logs:*:*:*'],156 effect=Effect.ALLOW,157 )158 ]159 ),160 'ElasticsearchPolicy': PolicyDocument(161 statements=[162 PolicyStatement(163 actions=[164 'es:ESHttpDelete',165 'es:ESHttpGet',166 'es:ESHttpHead',167 'es:ESHttpPatch',168 'es:ESHttpPost',169 'es:ESHttpPut',170 ],171 resources=[f'{elasticsearch_index.elasticsearch_domain.domain_arn}/*'],172 effect=Effect.ALLOW,173 )174 ]175 ),176 'DynamodbStreamsPolicy': PolicyDocument(177 statements=[178 PolicyStatement(179 actions=[180 'dynamodb:DescribeStream',181 'dynamodb:GetRecords',182 'dynamodb:GetShardIterator',183 'dynamodb:ListStreams',184 ],185 resources=[dynamodb_stream_arn],186 effect=Effect.ALLOW,187 )188 ]189 ),190 }191 if sagemaker_endpoint_arn:192 cloner_inline_policies['SagemakerPolicy'] = PolicyDocument(193 statements=[194 PolicyStatement(195 actions=[196 'sagemaker:InvokeEndpoint'197 ],198 resources=[sagemaker_endpoint_arn],199 effect=Effect.ALLOW200 )201 ]202 )203 cloner_function = Function(204 scope=self,205 id='ClonerFunction',206 code=Code.from_asset(cloner_root),207 handler='index.handler',208 runtime=Runtime.PYTHON_3_8,209 environment={210 'ES_INDEX_NAME': elasticsearch_index.index_name,211 'ES_DOMAIN_ENDPOINT': elasticsearch_index.elasticsearch_domain.domain_endpoint,212 'PRIMARY_KEY_FIELD': primary_key_field,213 **{214 k: optional_sagemaker_parameters[k] for k in optional_sagemaker_parameters215 if all(optional_sagemaker_parameters.values())216 }217 },218 events=[dynamodb_event_source],219 layers=[elasticsearch_layer],220 log_retention=RetentionDays.ONE_MONTH,221 memory_size=128,222 role=Role(223 scope=self,224 id='ClonerFunctionRole',225 assumed_by=ServicePrincipal('lambda.amazonaws.com'),226 inline_policies=cloner_inline_policies,227 description='Role for DynamoDB Cloner Function',228 ),229 timeout=Duration.seconds(30),230 )231 if kms_key:232 cloner_function.add_to_role_policy(233 PolicyStatement(234 actions=['kms:Decrypt'],235 resources=[kms_key.key_arn],236 effect=Effect.ALLOW,237 )238 )239 @staticmethod240 def __resolve_sagemaker_endpoints_arn(endpoint_name: str) -> str:241 sts_client = boto3.client('sts')242 region = sts_client.meta.region_name243 account = sts_client.get_caller_identity()['Account']...

Full Screen

Full Screen

config.py

Source:config.py Github

copy

Full Screen

1from pydantic import BaseSettings, validator, SecretStr, HttpUrl, PostgresDsn2from typing import Optional, Any, Dict3import os4class AppSettings(BaseSettings):5 # Init User6 WEBMASTER_EMAIL: str = os.getenv("WEBMASTER_EMAIL")7 WEBMASTER_PASSWORD: str = os.getenv("WEBMASTER_PASSWORD")8 # DB9 POSTGRES_SERVER: str = os.getenv("POSTGRES_SERVER")10 POSTGRES_USER: str = os.getenv("POSTGRES_USER")11 POSTGRES_PASSWORD: str = os.getenv("POSTGRES_PASSWORD")12 POSTGRES_DB: str = os.getenv("POSTGRES_DB")13 SQLALCHEMY_DATABASE_URI: Optional[PostgresDsn] = None14 @validator("SQLALCHEMY_DATABASE_URI", pre=True)15 def assemble_db_connection(cls, val: Optional[str], values: Dict[str, Any]) -> Any:16 if isinstance(val, str):17 return val18 if not values.get("POSTGRES_SERVER") :19 return val20 return PostgresDsn.build(21 scheme="postgresql",22 user=values.get("POSTGRES_USER"),23 password=values.get("POSTGRES_PASSWORD"),24 host=values.get("POSTGRES_SERVER"),25 path=f"/{values.get('POSTGRES_DB') or ''}",26 )27 # Security28 secret_key: SecretStr = os.getenv("SecretStr")29 jwt_token_prefix: str = "Token" # token? Bearer ?30 # sentry31 SENTRY_DSN: Optional[HttpUrl] = os.getenv("SENTRY_DSN")32 @validator("SENTRY_DSN", pre=True)33 def sentry_dsn_can_be_blank(cls, v: str) -> Optional[str]:34 if not v : return v35 if len(v) == 0:36 return None37 return v38 # databrick39 DATABRICKS_WORKSPACE_URL: HttpUrl = os.getenv("DATABRICKS_WORKSPACE_URL")40 DATABRICKS_TOKEN: str = os.getenv("DATABRICKS_TOKEN")41 DATABRICKS_JOB_API_VERSION: str = os.getenv("DATABRICKS_JOB_API_VERSION")42 # Dynamo43 DYNAMO_TABLE_NAME:str = os.environ.get('APP_TABLE_NAME', '')44 DYNAMODB_STREAM_ARN:str = os.environ.get('DYNAMODB_STREAM_ARN', '')...

Full Screen

Full Screen

env.py

Source:env.py Github

copy

Full Screen

1import os, subprocess2ENV_DYNAMODB_STREAM_ARN = 'DYNAMODB_STREAM_ARN'3DGB_CHALICE = 'DGB_CHALICE'4def get_dynamodb_stream_arn():5 """Resolve dynamodb stream arn.6 """7 dynamodb_stream_arn = ""8 if ENV_DYNAMODB_STREAM_ARN in os.environ:9 dynamodb_stream_arn = os.environ[ENV_DYNAMODB_STREAM_ARN]10 else:11 # DynamoDb streams are active for 24 hours after deletion of main table12 # We can get a list of streams here, we take first one and we don't check if is ENABLED13 # This is because this is for development chalice run only, on AWS premise it will take14 # stream arn from env variables filled during cdk deployment15 dynamodb_stream_arn = subprocess.run(16 ['aws', 'dynamodbstreams', 'list-streams', '--query', 'Streams[0].StreamArn'],17 stdout=subprocess.PIPE).stdout.decode()18 return dynamodb_stream_arn...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful