Best Python code snippet using localstack_python
impl.py
Source:impl.py  
...45    @available46    def delete_s3_object(self, s3_path):47        bucket, prefix = self.split_s3_path(s3_path)48        creds = self.get_creds()49        boto3_session = get_boto3_session(creds.region_name, creds.aws_profile_name)50        s3_client = boto3_session.client("s3")51        s3_resource = boto3_session.resource("s3")52        if self.s3_path_exists(s3_path, s3_client):53            logger.info(f"Delete objects from bucket={bucket}, prefix={prefix}")54            s3_resource.Bucket(bucket).objects.filter(Prefix=prefix).delete()55    @available56    def s3_table_location(self, schema_name: str, table_name: str) -> str:57        creds = self.get_creds()58        if creds.s3_data_dir is not None:59            s3_path = creds.s3_data_dir.format(60                schema_name=schema_name, table_name=table_name61            )62            return s3_path63        else:64            raise ValueError("s3_data_dir is required for the profile config")65    @available66    def clean_up_partitions(67        self, database_name: str, table_name: str, where_condition: str68    ):69        # Look up Glue partitions & clean up70        creds = self.get_creds()71        boto3_session = get_boto3_session(creds.region_name, creds.aws_profile_name)72        with boto3_client_lock:73            glue_client = boto3_session.client("glue")74        s3_resource = boto3_session.resource("s3")75        partitions = glue_client.get_partitions(76            # CatalogId='123456789012', # Need to make this configurable if it is different from default AWS Account ID77            DatabaseName=database_name,78            TableName=table_name,79            Expression=where_condition,80        )81        p = re.compile("s3://([^/]*)/(.*)")82        for partition in partitions["Partitions"]:83            logger.debug(84                "Deleting objects for partition '{}' at '{}'",85                partition["Values"],86                partition["StorageDescriptor"]["Location"],87            )88            m = p.match(partition["StorageDescriptor"]["Location"])89            if m is not None:90                bucket_name = m.group(1)91                prefix = m.group(2)92                s3_bucket = s3_resource.Bucket(bucket_name)93                s3_bucket.objects.filter(Prefix=prefix).delete()94    @available95    def clean_up_table(self, database_name: str, table_name: str):96        # Look up Glue partitions & clean up97        creds = self.get_creds()98        boto3_session = get_boto3_session(creds.region_name, creds.aws_profile_name)99        with boto3_client_lock:100            glue_client = boto3_session.client("glue")101        try:102            table = glue_client.get_table(DatabaseName=database_name, Name=table_name)103        except ClientError as e:104            if e.response["Error"]["Code"] == "EntityNotFoundException":105                logger.debug("Table '{}' does not exists - Ignoring", table_name)106                return107        if table is not None:108            logger.debug(109                "Deleting table data from'{}'",110                table["Table"]["StorageDescriptor"]["Location"],111            )112            p = re.compile("s3://([^/]*)/(.*)")...schema_validator.py
Source:schema_validator.py  
1from abc import ABC, abstractmethod2import boto33import awswrangler as wr  # Ensure Lambda has an AWS Wrangler Layer configured4from ..commons import init_logger5logger = init_logger(__name__)6class GlueSchemaValidator(ABC):7    """8    Abstract class to validate objects against Glue Table schemas.9    """10    def __init__(self, boto3_session=None):11        """12        Initializes Glue client based on the supplied or a new session.13        Args:14            boto3_session: Boto3 session15        """16        self.boto3_session = boto3_session17        # Reuse session or create a default one18        self.glue_client = boto3_session.client(19            'glue') if boto3_session else boto3.client('glue')20    def _get_table_parameters(self, database_name, table_name):21        """22        Returns the parameters of the Glue table23        Args:24            database_name: Glue database name25            table_name: Glue table name26        Returns: dict of table parameters27        """28        return self.glue_client.get_table(29            DatabaseName=database_name, Name=table_name)['Table']['Parameters']30    def _get_table_schema(self, database_name, table_name):31        """32        Returns column names and respective types of the Glue table33        Args:34            database_name: Glue database name35            table_name: Glue table name36        Returns: list of dicts of a form { 'Name': ..., 'Type': ...}37        """38        return self.glue_client.get_table(39            DatabaseName=database_name, Name=table_name)['Table']['StorageDescriptor']['Columns']40    @abstractmethod41    def validate(self, prefix, keys, database_name, table_name):42        """43        Validates the object(s) against the Glue schema44        Args:45            prefix: S3 prefix46            keys: list of S3 keys47            database_name: Glue database name48            table_name: Glue table name49        Returns: validation result: True or False50        """51        pass52class ParquetSchemaValidator(GlueSchemaValidator):53    def validate(self, prefix, keys, database_name, table_name):54        """55        Validates the Parquet S3 object(s) against the Glue schema56        Args:57            prefix: S3 prefix58            keys: list of S3 keys59            database_name: Glue database name60            table_name: Glue table name61        Returns: validation result: True or False62        """63        # Retrieve table parameters64        table_parameters = self._get_table_parameters(65            database_name, table_name)66        logger.info(f"Table parameters: {table_parameters}")67        # Parse table parameters68        validate_schema, validate_latest = self._parse_table_parameters(69            table_parameters)70        if validate_schema:71            # Retrieve table schema72            # Columns must be sorted in order to compare the schema because Parquet73            # does not respect the order74            table_schema = sorted(75                self._get_table_schema(database_name, table_name), key=lambda x: x['Name'])76            logger.info(f"Table schema: {table_schema}")77            # Retrieve object schema78            object_schema = self._get_object_schema(79                prefix, keys, validate_latest)80            logger.info(81                f"Object prefix: {prefix}, keys: {keys}, schema: {object_schema}")82            if table_schema != object_schema:83                return False84        return True85    def _get_object_schema(self, prefix, keys, get_latest):86        """87        Retrieves object schema from a Parquet file88        Args:89            prefix: S3 prefix90            keys: list of S3 keys91            get_latest: Flag to get only latest object (otherwise - get all available objects)92        Returns:  list of dicts of a form { 'Name': ..., 'Type': ...}93        """94        # Retrieve object metadata95        s3_objects = wr.s3.describe_objects(96            path=keys if keys and len(keys) > 0 else prefix,97            boto3_session=self.boto3_session98        )99        # Sort by last modified date and filter out empty objects100        object_keys = [object_key for object_key, object_metadata in sorted(101            s3_objects.items(),102            key=lambda k_v: k_v[1]['LastModified']103        ) if object_metadata['ContentLength'] > 0]104        # Retrieve Parquet metadata105        column_types, _ = wr.s3.read_parquet_metadata(106            path=object_keys[0] if get_latest else object_keys,107            boto3_session=self.boto3_session108        )109        # Columns must be sorted in order to compare the schema because Parquet110        # does not respect the order111        return sorted(112            list({'Name': name.lower(), 'Type': type}113                 for name, type in column_types.items()),114            key=lambda x: x['Name'])115    @staticmethod116    def _parse_table_parameters(table_parameters):117        """118        Retrieves specific table parameters and provides defaults119        Args:120            table_parameters: table parameters dict121        Returns: tuple of Booleans validate_schema, validate_latest122        """123        try:124            validate_schema = True \125                if table_parameters and table_parameters['validate_schema'] == 'true' else False126        except KeyError:127            validate_schema = False128        try:129            validate_latest = True \130                if table_parameters and table_parameters['validate_latest'] == 'true' else False131        except KeyError:132            validate_latest = False...secrets.py
Source:secrets.py  
1import json2import os3import boto34from logger import logger5def export(boto3_session=None, **params):6    secret = fetch(boto3_session, **params)7    os.environ.update(**secret)8def fetch(boto3_session=None, **params):9    boto3_session = boto3_session or boto3.Session()10    secrets = boto3_session.client('secretsmanager')11    logger.info('GET SECRET %s', json.dumps(params))12    secret = json.loads(secrets.get_secret_value(**params)['SecretString'])...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
