How to use message_group_id method in localstack

Best Python code snippet using localstack_python

plugin.py

Source:plugin.py Github

copy

Full Screen

1import logging2import boto33from botocore.client import ClientError, Config4from sentry.integrations import FeatureDescription, IntegrationFeatures5from sentry.plugins.bases.data_forwarding import DataForwardingPlugin6from sentry.utils import json, metrics7from sentry_plugins.base import CorePluginMixin8from sentry_plugins.utils import get_secret_field_config9logger = logging.getLogger(__name__)10DESCRIPTION = """11Forward Sentry events to Amazon SQS.12Amazon Simple Queue Service (SQS) is a fully managed message13queuing service that enables you to decouple and scale microservices,14distributed systems, and serverless applications.15"""16def get_regions():17 public_region_list = boto3.session.Session().get_available_regions("sqs")18 cn_region_list = boto3.session.Session().get_available_regions("sqs", partition_name="aws-cn")19 return public_region_list + cn_region_list20def track_response_metric(fn):21 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html#SQS.Queue.send_message22 # boto3's send_message doesn't return success/fail or http codes23 # success is a boolean based on whether there was an exception or not24 def wrapper(*args, **kwargs):25 try:26 success = fn(*args, **kwargs)27 metrics.incr(28 "data-forwarding.http_response", tags={"plugin": "amazon-sqs", "success": success}29 )30 except Exception:31 metrics.incr(32 "data-forwarding.http_response", tags={"plugin": "amazon-sqs", "success": False}33 )34 raise35 return success36 return wrapper37class AmazonSQSPlugin(CorePluginMixin, DataForwardingPlugin):38 title = "Amazon SQS"39 slug = "amazon-sqs"40 description = DESCRIPTION41 conf_key = "amazon-sqs"42 required_field = "queue_url"43 feature_descriptions = [44 FeatureDescription(45 """46 Forward Sentry errors and events to Amazon SQS.47 """,48 IntegrationFeatures.DATA_FORWARDING,49 )50 ]51 def get_config(self, project, **kwargs):52 return [53 {54 "name": "queue_url",55 "label": "Queue URL",56 "type": "url",57 "placeholder": "https://sqs-us-east-1.amazonaws.com/12345678/myqueue",58 },59 {60 "name": "region",61 "label": "Region",62 "type": "select",63 "choices": tuple((z, z) for z in get_regions()),64 },65 {66 "name": "access_key",67 "label": "Access Key",68 "type": "text",69 "placeholder": "Access Key",70 },71 get_secret_field_config(72 name="secret_key", label="Secret Key", secret=self.get_option("secret_key", project)73 ),74 {75 "name": "message_group_id",76 "label": "Message Group ID",77 "type": "text",78 "required": False,79 "placeholder": "Required for FIFO queues, exclude for standard queues",80 },81 {82 "name": "s3_bucket",83 "label": "S3 Bucket",84 "type": "text",85 "required": False,86 "placeholder": "s3-bucket",87 "help": (88 "Specify a bucket to store events in S3. The SQS message will contain a reference"89 " to the payload location in S3. If no S3 bucket is provided, events over the SQS"90 " limit of 256KB will not be forwarded."91 ),92 },93 ]94 def get_rate_limit(self):95 # no rate limit for SQS96 return (0, 0)97 @track_response_metric98 def forward_event(self, event, payload):99 queue_url = self.get_option("queue_url", event.project)100 access_key = self.get_option("access_key", event.project)101 secret_key = self.get_option("secret_key", event.project)102 region = self.get_option("region", event.project)103 message_group_id = self.get_option("message_group_id", event.project)104 # the metrics tags are a subset of logging params105 metric_tags = {106 "project_id": event.project_id,107 "organization_id": event.project.organization_id,108 }109 logging_params = metric_tags.copy()110 logging_params["event_id"] = event.event_id111 logging_params["issue_id"] = event.group_id112 if not all((queue_url, access_key, secret_key, region)):113 logger.info("sentry_plugins.amazon_sqs.skip_unconfigured", extra=logging_params)114 return115 boto3_args = {116 "aws_access_key_id": access_key,117 "aws_secret_access_key": secret_key,118 "region_name": region,119 }120 def log_and_increment(metrics_name):121 logger.info(122 metrics_name,123 extra=logging_params,124 )125 metrics.incr(126 metrics_name,127 tags=metric_tags,128 )129 def s3_put_object(*args, **kwargs):130 s3_client = boto3.client(131 service_name="s3", config=Config(signature_version="s3v4"), **boto3_args132 )133 return s3_client.put_object(*args, **kwargs)134 def sqs_send_message(message):135 client = boto3.client(service_name="sqs", **boto3_args)136 send_message_args = {"QueueUrl": queue_url, "MessageBody": message}137 # need a MessageGroupId for FIFO queues138 # note that if MessageGroupId is specified for non-FIFO, this will fail139 if message_group_id:140 from uuid import uuid4141 send_message_args["MessageGroupId"] = message_group_id142 # if content based de-duplication is not enabled, we need to provide a143 # MessageDeduplicationId144 send_message_args["MessageDeduplicationId"] = uuid4().hex145 return client.send_message(**send_message_args)146 # wrap S3 put_object and and SQS send message in one try/except147 s3_bucket = self.get_option("s3_bucket", event.project)148 try:149 # if we have an S3 bucket, upload to S3150 if s3_bucket:151 # we want something like 2020-08-29 so we can store it by the date152 date = event.datetime.strftime("%Y-%m-%d")153 key = f"{event.project.slug}/{date}/{event.event_id}"154 logger.info("sentry_plugins.amazon_sqs.s3_put_object", extra=logging_params)155 s3_put_object(Bucket=s3_bucket, Body=json.dumps(payload), Key=key)156 url = f"https://{s3_bucket}.s3-{region}.amazonaws.com/{key}"157 # just include the s3Url and the event ID in the payload158 payload = {"s3Url": url, "eventID": event.event_id}159 message = json.dumps(payload)160 if len(message) > 256 * 1024:161 logger.info("sentry_plugins.amazon_sqs.skip_oversized", extra=logging_params)162 return False163 sqs_send_message(message)164 except ClientError as e:165 if str(e).startswith("An error occurred (InvalidClientTokenId)") or str(e).startswith(166 "An error occurred (AccessDenied)"167 ):168 # If there's an issue with the user's token then we can't do169 # anything to recover. Just log and continue.170 log_and_increment("sentry_plugins.amazon_sqs.access_token_invalid")171 return False172 elif str(e).endswith("must contain the parameter MessageGroupId."):173 log_and_increment("sentry_plugins.amazon_sqs.missing_message_group_id")174 return False175 elif str(e).startswith("An error occurred (NoSuchBucket)"):176 # If there's an issue with the user's s3 bucket then we can't do177 # anything to recover. Just log and continue.178 log_and_increment("sentry_plugins.amazon_sqs.s3_bucket_invalid")179 return False180 raise...

Full Screen

Full Screen

destination.py

Source:destination.py Github

copy

Full Screen

1#2# Copyright (c) 2021 Airbyte, Inc., all rights reserved.3#4import json5from typing import Any, Iterable, Mapping6from uuid import uuid47import boto38from airbyte_cdk import AirbyteLogger9from airbyte_cdk.destinations import Destination10from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status, Type11from botocore.exceptions import ClientError12class DestinationAmazonSqs(Destination):13 def queue_is_fifo(self, url: str) -> bool:14 return url.endswith(".fifo")15 def parse_queue_name(self, url: str) -> str:16 return url.rsplit("/", 1)[-1]17 def send_single_message(self, queue, message) -> dict:18 return queue.send_message(**message)19 def build_sqs_message(self, record, message_body_key=None):20 data = None21 if message_body_key:22 data = record.data.get(message_body_key)23 if data is None:24 raise Exception("Message had no attribute of the configured Message Body Key: " + message_body_key)25 else:26 data = json.dumps(record.data)27 message = {"MessageBody": data}28 return message29 def add_attributes_to_message(self, record, message):30 attributes = {"airbyte_emitted_at": {"StringValue": str(record.emitted_at), "DataType": "String"}}31 message["MessageAttributes"] = attributes32 return message33 def set_message_delay(self, message, message_delay):34 message["DelaySeconds"] = message_delay35 return message36 # MessageGroupID and MessageDeduplicationID are required properties for FIFO queues37 # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html38 def set_message_fifo_properties(self, message, message_group_id, use_content_dedupe=False):39 # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html40 if not message_group_id:41 raise Exception("Failed to build message - Message Group ID is required for FIFO queues")42 else:43 message["MessageGroupId"] = message_group_id44 # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html45 if not use_content_dedupe:46 message["MessageDeduplicationId"] = str(uuid4())47 # TODO: Support getting MessageDeduplicationId from a key in the record48 # if message_dedupe_id:49 # message['MessageDeduplicationId'] = message_dedupe_id50 return message51 # TODO: Support batch send52 # def send_batch_messages(messages, queue):53 # entry = {54 # 'Id': "1",55 # 'MessageBody': str(record.data),56 # }57 # response = queue.send_messages(Entries=messages)58 # if 'Successful' in response:59 # for status in response['Successful']:60 # print("Message sent: " + status['MessageId'])61 # if 'Failed' in response:62 # for status in response['Failed']:63 # print("Message sent: " + status['MessageId'])64 # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html65 def write(66 self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]67 ) -> Iterable[AirbyteMessage]:68 # Required propeties69 queue_url = config["queue_url"]70 queue_region = config["region"]71 # TODO: Implement optional params for batch72 # Optional Properties73 # max_batch_size = config.get("max_batch_size", 10)74 # send_as_batch = config.get("send_as_batch", False)75 message_delay = config.get("message_delay")76 message_body_key = config.get("message_body_key")77 # FIFO Properties78 message_group_id = config.get("message_group_id")79 # Senstive Properties80 access_key = config["access_key"]81 secret_key = config["secret_key"]82 session = boto3.Session(aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=queue_region)83 sqs = session.resource("sqs")84 queue = sqs.Queue(url=queue_url)85 # TODO: Make access/secret key optional, support public access & profiles86 # TODO: Support adding/setting attributes in the UI87 # TODO: Support extract a specific path as message attributes88 for message in input_messages:89 if message.type == Type.RECORD:90 sqs_message = self.build_sqs_message(message.record, message_body_key)91 if message_delay:92 sqs_message = self.set_message_delay(sqs_message, message_delay)93 sqs_message = self.add_attributes_to_message(message.record, sqs_message)94 if self.queue_is_fifo(queue_url):95 use_content_dedupe = False if queue.attributes.get("ContentBasedDeduplication") == "false" else "true"96 self.set_message_fifo_properties(sqs_message, message_group_id, use_content_dedupe)97 self.send_single_message(queue, sqs_message)98 if message.type == Type.STATE:99 yield message100 def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:101 try:102 # Required propeties103 queue_url = config["queue_url"]104 logger.debug("Amazon SQS Destination Config Check - queue_url: " + queue_url)105 queue_region = config["region"]106 logger.debug("Amazon SQS Destination Config Check - region: " + queue_region)107 # Senstive Properties108 access_key = config["access_key"]109 logger.debug("Amazon SQS Destination Config Check - access_key (ends with): " + access_key[-1])110 secret_key = config["secret_key"]111 logger.debug("Amazon SQS Destination Config Check - secret_key (ends with): " + secret_key[-1])112 logger.debug("Amazon SQS Destination Config Check - Starting connection test ---")113 session = boto3.Session(aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=queue_region)114 sqs = session.resource("sqs")115 queue = sqs.Queue(url=queue_url)116 if hasattr(queue, "attributes"):117 logger.debug("Amazon SQS Destination Config Check - Connection test successful ---")118 if self.queue_is_fifo(queue_url):119 fifo = queue.attributes.get("FifoQueue", False)120 if not fifo:121 raise Exception("FIFO Queue URL set but Queue is not FIFO")122 message_group_id = config.get("message_group_id")123 if message_group_id is None:124 raise Exception("Message Group ID is not set, but is required for FIFO Queues.")125 # TODO: Support referencing an ID inside the Record to use as de-dupe ID126 # message_dedupe_key = config.get("message_dedupe_key")127 # content_dedupe = queue.attributes.get('ContentBasedDeduplication')128 # if content_dedupe == "false":129 # if message_dedupe_id is None:130 # raise Exception("You must provide a Message Deduplication ID when ContentBasedDeduplication is not used.")131 return AirbyteConnectionStatus(status=Status.SUCCEEDED)132 else:133 return AirbyteConnectionStatus(134 status=Status.FAILED, message="Amazon SQS Destination Config Check - Could not connect to queue"135 )136 except ClientError as e:137 return AirbyteConnectionStatus(138 status=Status.FAILED, message=f"Amazon SQS Destination Config Check - Error in AWS Client: {str(e)}"139 )140 except Exception as e:141 return AirbyteConnectionStatus(142 status=Status.FAILED, message=f"Amazon SQS Destination Config Check - An exception occurred: {str(e)}"...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful