Best Python code snippet using localstack_python
plugin.py
Source:plugin.py  
1import logging2import boto33from botocore.client import ClientError, Config4from sentry.integrations import FeatureDescription, IntegrationFeatures5from sentry.plugins.bases.data_forwarding import DataForwardingPlugin6from sentry.utils import json, metrics7from sentry_plugins.base import CorePluginMixin8from sentry_plugins.utils import get_secret_field_config9logger = logging.getLogger(__name__)10DESCRIPTION = """11Forward Sentry events to Amazon SQS.12Amazon Simple Queue Service (SQS) is a fully managed message13queuing service that enables you to decouple and scale microservices,14distributed systems, and serverless applications.15"""16def get_regions():17    public_region_list = boto3.session.Session().get_available_regions("sqs")18    cn_region_list = boto3.session.Session().get_available_regions("sqs", partition_name="aws-cn")19    return public_region_list + cn_region_list20def track_response_metric(fn):21    # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html#SQS.Queue.send_message22    # boto3's send_message doesn't return success/fail or http codes23    # success is a boolean based on whether there was an exception or not24    def wrapper(*args, **kwargs):25        try:26            success = fn(*args, **kwargs)27            metrics.incr(28                "data-forwarding.http_response", tags={"plugin": "amazon-sqs", "success": success}29            )30        except Exception:31            metrics.incr(32                "data-forwarding.http_response", tags={"plugin": "amazon-sqs", "success": False}33            )34            raise35        return success36    return wrapper37class AmazonSQSPlugin(CorePluginMixin, DataForwardingPlugin):38    title = "Amazon SQS"39    slug = "amazon-sqs"40    description = DESCRIPTION41    conf_key = "amazon-sqs"42    required_field = "queue_url"43    feature_descriptions = [44        FeatureDescription(45            """46            Forward Sentry errors and events to Amazon SQS.47            """,48            IntegrationFeatures.DATA_FORWARDING,49        )50    ]51    def get_config(self, project, **kwargs):52        return [53            {54                "name": "queue_url",55                "label": "Queue URL",56                "type": "url",57                "placeholder": "https://sqs-us-east-1.amazonaws.com/12345678/myqueue",58            },59            {60                "name": "region",61                "label": "Region",62                "type": "select",63                "choices": tuple((z, z) for z in get_regions()),64            },65            {66                "name": "access_key",67                "label": "Access Key",68                "type": "text",69                "placeholder": "Access Key",70            },71            get_secret_field_config(72                name="secret_key", label="Secret Key", secret=self.get_option("secret_key", project)73            ),74            {75                "name": "message_group_id",76                "label": "Message Group ID",77                "type": "text",78                "required": False,79                "placeholder": "Required for FIFO queues, exclude for standard queues",80            },81            {82                "name": "s3_bucket",83                "label": "S3 Bucket",84                "type": "text",85                "required": False,86                "placeholder": "s3-bucket",87                "help": (88                    "Specify a bucket to store events in S3. The SQS message will contain a reference"89                    " to the payload location in S3. If no S3 bucket is provided, events over the SQS"90                    " limit of 256KB will not be forwarded."91                ),92            },93        ]94    def get_rate_limit(self):95        # no rate limit for SQS96        return (0, 0)97    @track_response_metric98    def forward_event(self, event, payload):99        queue_url = self.get_option("queue_url", event.project)100        access_key = self.get_option("access_key", event.project)101        secret_key = self.get_option("secret_key", event.project)102        region = self.get_option("region", event.project)103        message_group_id = self.get_option("message_group_id", event.project)104        # the metrics tags are a subset of logging params105        metric_tags = {106            "project_id": event.project_id,107            "organization_id": event.project.organization_id,108        }109        logging_params = metric_tags.copy()110        logging_params["event_id"] = event.event_id111        logging_params["issue_id"] = event.group_id112        if not all((queue_url, access_key, secret_key, region)):113            logger.info("sentry_plugins.amazon_sqs.skip_unconfigured", extra=logging_params)114            return115        boto3_args = {116            "aws_access_key_id": access_key,117            "aws_secret_access_key": secret_key,118            "region_name": region,119        }120        def log_and_increment(metrics_name):121            logger.info(122                metrics_name,123                extra=logging_params,124            )125            metrics.incr(126                metrics_name,127                tags=metric_tags,128            )129        def s3_put_object(*args, **kwargs):130            s3_client = boto3.client(131                service_name="s3", config=Config(signature_version="s3v4"), **boto3_args132            )133            return s3_client.put_object(*args, **kwargs)134        def sqs_send_message(message):135            client = boto3.client(service_name="sqs", **boto3_args)136            send_message_args = {"QueueUrl": queue_url, "MessageBody": message}137            # need a MessageGroupId for FIFO queues138            # note that if MessageGroupId is specified for non-FIFO, this will fail139            if message_group_id:140                from uuid import uuid4141                send_message_args["MessageGroupId"] = message_group_id142                # if content based de-duplication is not enabled, we need to provide a143                # MessageDeduplicationId144                send_message_args["MessageDeduplicationId"] = uuid4().hex145            return client.send_message(**send_message_args)146        # wrap S3 put_object and and SQS send message in one try/except147        s3_bucket = self.get_option("s3_bucket", event.project)148        try:149            # if we have an S3 bucket, upload to S3150            if s3_bucket:151                # we want something like 2020-08-29 so we can store it by the date152                date = event.datetime.strftime("%Y-%m-%d")153                key = f"{event.project.slug}/{date}/{event.event_id}"154                logger.info("sentry_plugins.amazon_sqs.s3_put_object", extra=logging_params)155                s3_put_object(Bucket=s3_bucket, Body=json.dumps(payload), Key=key)156                url = f"https://{s3_bucket}.s3-{region}.amazonaws.com/{key}"157                # just include the s3Url and the event ID in the payload158                payload = {"s3Url": url, "eventID": event.event_id}159            message = json.dumps(payload)160            if len(message) > 256 * 1024:161                logger.info("sentry_plugins.amazon_sqs.skip_oversized", extra=logging_params)162                return False163            sqs_send_message(message)164        except ClientError as e:165            if str(e).startswith("An error occurred (InvalidClientTokenId)") or str(e).startswith(166                "An error occurred (AccessDenied)"167            ):168                # If there's an issue with the user's token then we can't do169                # anything to recover. Just log and continue.170                log_and_increment("sentry_plugins.amazon_sqs.access_token_invalid")171                return False172            elif str(e).endswith("must contain the parameter MessageGroupId."):173                log_and_increment("sentry_plugins.amazon_sqs.missing_message_group_id")174                return False175            elif str(e).startswith("An error occurred (NoSuchBucket)"):176                # If there's an issue with the user's s3 bucket then we can't do177                # anything to recover. Just log and continue.178                log_and_increment("sentry_plugins.amazon_sqs.s3_bucket_invalid")179                return False180            raise...destination.py
Source:destination.py  
1#2# Copyright (c) 2021 Airbyte, Inc., all rights reserved.3#4import json5from typing import Any, Iterable, Mapping6from uuid import uuid47import boto38from airbyte_cdk import AirbyteLogger9from airbyte_cdk.destinations import Destination10from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status, Type11from botocore.exceptions import ClientError12class DestinationAmazonSqs(Destination):13    def queue_is_fifo(self, url: str) -> bool:14        return url.endswith(".fifo")15    def parse_queue_name(self, url: str) -> str:16        return url.rsplit("/", 1)[-1]17    def send_single_message(self, queue, message) -> dict:18        return queue.send_message(**message)19    def build_sqs_message(self, record, message_body_key=None):20        data = None21        if message_body_key:22            data = record.data.get(message_body_key)23            if data is None:24                raise Exception("Message had no attribute of the configured Message Body Key: " + message_body_key)25        else:26            data = json.dumps(record.data)27        message = {"MessageBody": data}28        return message29    def add_attributes_to_message(self, record, message):30        attributes = {"airbyte_emitted_at": {"StringValue": str(record.emitted_at), "DataType": "String"}}31        message["MessageAttributes"] = attributes32        return message33    def set_message_delay(self, message, message_delay):34        message["DelaySeconds"] = message_delay35        return message36    # MessageGroupID and MessageDeduplicationID are required properties for FIFO queues37    # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html38    def set_message_fifo_properties(self, message, message_group_id, use_content_dedupe=False):39        # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html40        if not message_group_id:41            raise Exception("Failed to build message - Message Group ID is required for FIFO queues")42        else:43            message["MessageGroupId"] = message_group_id44        # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html45        if not use_content_dedupe:46            message["MessageDeduplicationId"] = str(uuid4())47        # TODO: Support getting MessageDeduplicationId from a key in the record48        # if message_dedupe_id:49        #     message['MessageDeduplicationId'] = message_dedupe_id50        return message51    # TODO: Support batch send52    # def send_batch_messages(messages, queue):53    #     entry = {54    #         'Id': "1",55    #         'MessageBody': str(record.data),56    #     }57    #     response = queue.send_messages(Entries=messages)58    #     if 'Successful' in response:59    #         for status in response['Successful']:60    #             print("Message sent: " + status['MessageId'])61    #     if 'Failed' in response:62    #         for status in response['Failed']:63    #             print("Message sent: " + status['MessageId'])64    # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html65    def write(66        self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]67    ) -> Iterable[AirbyteMessage]:68        # Required propeties69        queue_url = config["queue_url"]70        queue_region = config["region"]71        # TODO: Implement optional params for batch72        # Optional Properties73        # max_batch_size = config.get("max_batch_size", 10)74        # send_as_batch = config.get("send_as_batch", False)75        message_delay = config.get("message_delay")76        message_body_key = config.get("message_body_key")77        # FIFO Properties78        message_group_id = config.get("message_group_id")79        # Senstive Properties80        access_key = config["access_key"]81        secret_key = config["secret_key"]82        session = boto3.Session(aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=queue_region)83        sqs = session.resource("sqs")84        queue = sqs.Queue(url=queue_url)85        # TODO: Make access/secret key optional, support public access & profiles86        # TODO: Support adding/setting attributes in the UI87        # TODO: Support extract a specific path as message attributes88        for message in input_messages:89            if message.type == Type.RECORD:90                sqs_message = self.build_sqs_message(message.record, message_body_key)91                if message_delay:92                    sqs_message = self.set_message_delay(sqs_message, message_delay)93                sqs_message = self.add_attributes_to_message(message.record, sqs_message)94                if self.queue_is_fifo(queue_url):95                    use_content_dedupe = False if queue.attributes.get("ContentBasedDeduplication") == "false" else "true"96                    self.set_message_fifo_properties(sqs_message, message_group_id, use_content_dedupe)97                self.send_single_message(queue, sqs_message)98            if message.type == Type.STATE:99                yield message100    def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:101        try:102            # Required propeties103            queue_url = config["queue_url"]104            logger.debug("Amazon SQS Destination Config Check - queue_url: " + queue_url)105            queue_region = config["region"]106            logger.debug("Amazon SQS Destination Config Check - region: " + queue_region)107            # Senstive Properties108            access_key = config["access_key"]109            logger.debug("Amazon SQS Destination Config Check - access_key (ends with): " + access_key[-1])110            secret_key = config["secret_key"]111            logger.debug("Amazon SQS Destination Config Check - secret_key (ends with): " + secret_key[-1])112            logger.debug("Amazon SQS Destination Config Check - Starting connection test ---")113            session = boto3.Session(aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=queue_region)114            sqs = session.resource("sqs")115            queue = sqs.Queue(url=queue_url)116            if hasattr(queue, "attributes"):117                logger.debug("Amazon SQS Destination Config Check - Connection test successful ---")118                if self.queue_is_fifo(queue_url):119                    fifo = queue.attributes.get("FifoQueue", False)120                    if not fifo:121                        raise Exception("FIFO Queue URL set but Queue is not FIFO")122                    message_group_id = config.get("message_group_id")123                    if message_group_id is None:124                        raise Exception("Message Group ID is not set, but is required for FIFO Queues.")125                    # TODO: Support referencing an ID inside the Record to use as de-dupe ID126                    # message_dedupe_key = config.get("message_dedupe_key")127                    # content_dedupe = queue.attributes.get('ContentBasedDeduplication')128                    # if content_dedupe == "false":129                    #     if message_dedupe_id is None:130                    #         raise Exception("You must provide a Message Deduplication ID when ContentBasedDeduplication is not used.")131                return AirbyteConnectionStatus(status=Status.SUCCEEDED)132            else:133                return AirbyteConnectionStatus(134                    status=Status.FAILED, message="Amazon SQS Destination Config Check - Could not connect to queue"135                )136        except ClientError as e:137            return AirbyteConnectionStatus(138                status=Status.FAILED, message=f"Amazon SQS Destination Config Check - Error in AWS Client: {str(e)}"139            )140        except Exception as e:141            return AirbyteConnectionStatus(142                status=Status.FAILED, message=f"Amazon SQS Destination Config Check - An exception occurred: {str(e)}"...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
