Best Python code snippet using Kiwi_python
process_dns_updates.py
Source:process_dns_updates.py  
1#!/usr/bin/env python2"""process_dns_updates3Gets the latest DNS record change requests from SQS and updates the Route534records accordingly5NOTE: This is intended to be run within an AWS Lambda container! Correct6functionality outside of Lambda cannot be guaranteed.7"""8# builtin modules9import logging10import re11import json12import os13# 3rd party modules14import boto315import botocore.exceptions16# Pull some config from env (supplied by AWS Lambda)17AWS_REGION = os.environ['AWS_REGION']  # set by Lambda automatically18SQS_QUEUE_NAME = os.environ['SQS_QUEUE_NAME']  # set in Lambda manually19# Set up some global constants20MAX_MESSAGES = 10  # AWS global limit is 10, so let's get up to that amount21WAIT_TIME_SECONDS = 1  # max seconds to wait for a non-empty SQS queue (20 max)22MAX_MESSAGE_RECEIVE_LOOPS = 30  # set a sane limit to prevent infinite loop23# Set up the root logger with a basic StreamHandler set to log INFO and above.24# This is so we can also emit log events from imported modules.25logging.basicConfig()26logging.getLogger().setLevel(logging.INFO)27# Override default handler's format. AWS butchers it with useless info.28logging.getLogger().handlers[0].setFormatter(logging.Formatter(29    '%(levelname)s:%(name)s:%(message)s\r'))30# Set up a local logger so we can log our own messages31log = logging.getLogger(__name__)32# Set up API objects at the module level. Lambda will reuse the runtime33# envionments when possible, meaning the API connections will be reused.34# https://docs.aws.amazon.com/lambda/latest/dg/running-lambda-code.html35SQS_API = boto3.resource('sqs', region_name=AWS_REGION).get_queue_by_name(36    QueueName=SQS_QUEUE_NAME)37R53_API = boto3.client('route53', region_name=AWS_REGION)38class ErrorSearchException(Exception):39    """40    Exception raised when a record name can not be found in a Route5341    InvalidChangeBatch error. If this exception is raised, you should42    check if the error message returned by AWS's API has changed since43    this script was written (check the regex in44    DnsChangeQueue.remove_failed_record_change() for the expected format)45    """46    pass47class DnsChange(object):48    """49    One-stop shop for all your DNS-record-change-request info and actions50    The JSON in each SQS message's body should follow this format:51    {52        "Domain": str(qaolate.com),53        "Action": str(UPSERT),54        "Record": {55            "Name": str(test-record.qaolate.com),56            "Type": str(CNAME),57            "TTL": int(60),58            "Targets": [59                str(test1.example.com),60                str(test2.example.com)61            ]62        }63    }64    Keep in mind that the body JSON is stringified before being sent to SQS so65    all keys and values should be converted into their required data type66    before use67    """68    __hash__ = None  # Instances of this class should not be hashable69    def __init__(self, boto_message):70        """71        Args:72            boto_message (sqs.Message): boto3 object for the SQS message73        """74        # Init some useful class properties75        self._boto_message = boto_message76        self.sent_timestamp = int(boto_message.attributes['SentTimestamp'])77        self.receipt_handle = str(boto_message.receipt_handle)78        # Attributes to be set later79        self.requested = False  # Was this change added to a batch request?80        self.duplicate = False  # Is the change an unused duplicate?81        self.failed = False  # Did the change fail to apply?82        # Process and load the SQS message's body83        self.safe_load_body()84    def delete(self):85        """Delete the message from the queue"""86        self._boto_message.delete()87    def safe_load_body(self):88        """89        Attempt to load the SQS message's body, and get our expected values90        from it. If any expected fields are missing, produce a friendly error91        instead of a traceback.92        """93        # Attempt to parse the message body into JSON94        try:95            self.message_body = json.loads(self._boto_message.body)96        except Exception as e:97            log.error('Could not load SQS message body as JSON (%s)! ' +98                      "Marking message as 'failed'. Message body is:\n" +99                      str(self._boto_message.body), e)100            self.failed = True101            return102        # Attempt to load our attributes from the parsed JSON103        try:104            self.domain = str(self.message_body['Domain']).lower()105            self.action = str(self.message_body['Action']).upper()106            self.record = str(self.message_body['Record']['Name']).lower()107            self.record_type = str(self.message_body['Record']['Type']).upper()108            self.ttl = int(self.message_body['Record']['TTL'])109            self.targets = set()110            for target in self.message_body['Record']['Targets']:111                self.targets.add(str(target).lower())112        except KeyError as e:113            log.error('Could not load required attribute (%s) from message ' +114                      "body! Marking message as 'failed'. Message body is:\n" +115                      str(self.message_body), e)116            self.failed = True117            return118class DnsChangeQueue(object):119    """120    List of DNS changes being requested, and relevant attributes/actions121    """122    def __init__(self, sqs_queue_name):123        """124        Args:125            sqs_queue_name (str, unicode): name of DNS change SQS queue126        """127        self._sqs_api = SQS_API128        self._r53_api = R53_API129        self._messages = None  # list()130        self._record_changes = None  # dict()131        self._hosted_zones = None  # dict()132    @property133    def messages(self):134        """135        Build a list of DnsChange objects, attempting to receive all136        messages in the queue137        Returns:138            list of DnsChange() objects: all received messages from SQS139        """140        if self._messages is None:141            self._messages = list()142            num_received_msgs = None143            receive_loops = 0144            while (num_received_msgs != 0 and145                   receive_loops <= MAX_MESSAGE_RECEIVE_LOOPS):146                receive_loops += 1147                log.info('Starting receive_messages loop #%s', receive_loops)148                num_received_msgs = 0149                messages_from_boto = self._sqs_api.receive_messages(150                    AttributeNames=['SentTimestamp'],151                    MaxNumberOfMessages=MAX_MESSAGES,152                    WaitTimeSeconds=WAIT_TIME_SECONDS)153                for boto_message in messages_from_boto:154                    self._messages.append(DnsChange(boto_message))155                    num_received_msgs += 1156                log.info('Received %s messages from SQS', num_received_msgs)157            log.info('Received a total of %s messages from SQS',158                     len(self._messages))159        return self._messages160    @property161    def record_changes(self):162        """163        Return a slightly easier-to-process data set of our DNS changes.164        When calling this for the first time, it will create a new dict in the165        following format:166        {167            'foo.tld': {168                'record1.foo.tld': [169                    DnsChange(),170                    DnsChange()171                ]172            }173        }174        This makes it much easier to dedupe the changes and build a Route53175        request later.176        Returns:177            dict178        """179        if self._record_changes is None:180            self._record_changes = dict()181            # For each message, add it to the new dict, creating keys if they182            # don't exist already183            for msg in self.messages:184                if msg.failed is not True:185                    self._record_changes.setdefault(186                        msg.domain, {}).setdefault(187                            msg.record, []).append(msg)188        return self._record_changes189    def dedupe_change_requests(self):190        """191        Find and process any 'duplicate' change requests192        If any record has multiple changes (messages) in the queue, keep only193        most recent one, discarding the rest (and marking them as duplicates so194        the unused messages can be deleted later).195        """196        # Find all the records in the queue that have more than one change197        log.info('Removing any duplicate changes')198        for domain in self.record_changes:199            for record in self.record_changes[domain]:200                if len(self.record_changes[domain][record]) > 1:201                    # Sort the change list by timestamp, newest first202                    records = sorted(self.record_changes[domain][record],203                                     key=lambda r: r.sent_timestamp,204                                     reverse=True)205                    log.info(('Found %s change requests for %s. '206                              'Using only the most recent one.'),207                             len(records), record)208                    # Keep only the newest change, marking the others as209                    # duplicates and removing them from the change list210                    while len(records) > 1:211                        records.pop().duplicate = True212                    # Replace the original list with our own213                    self.record_changes[domain][record] = records214    @property215    def hosted_zones(self):216        """217        Create a dict of all our hosted zones and their IDs218        Returns:219            dict: {zone_name: zone_id, ...}220        """221        if self._hosted_zones is None:222            self._hosted_zones = dict()223            # For each hosted zone...224            for zone in self._r53_api.list_hosted_zones_by_name(225                    MaxItems='100')['HostedZones']:226                # Cut off the trailing '.'227                zone_name = '.'.join(zone['Name'].split('.')[:-1])228                # Get just the ID, not the full path229                zone_id = zone['Id'].split('/')[-1:][0]230                # Add to our dict231                self._hosted_zones[zone_name] = zone_id232        return self._hosted_zones233    def generate_r53_changebatch(self, domain):234        """235        Return a properly formed R53 changebatch request dict for a domain236        Args:237            domain (str): which domain to build the changebatch for238        Returns:239            list of dicts240        """241        log.info('Generating Route53 ChangeBatch request for %s', domain)242        changes = list()243        for record in self.record_changes[domain]:244            change = self.record_changes[domain][record][0]245            # Ignore any messages marked as 'failed'246            if change.failed is True:247                continue248            else:249                change.requested = True250            resource_records = list()251            for target in change.targets:252                resource_records.append({'Value': target})253            log.info('Adding change to request: %s -> %s',254                     change.record, change.targets)255            changes.append(256                {257                    'Action': change.action,258                    'ResourceRecordSet': {259                        'Name': change.record,260                        'Type': change.record_type,261                        'TTL': change.ttl,262                        'ResourceRecords': resource_records263                    }264                }265            )266        return {'Changes': changes}267    def mark_failed_record_change(self, domain, error_msg):268        """269        Given an Route53 InvalidChangeBatch error message, determine what270        record failed to change and mark is as failed271        Args:272            domain (str): hosted zone the record lives in273            error_msg (str, unicode): error message returned from Route53274        """275        # Iterate over _every_ requested change and use the record name to...276        for record in self.record_changes[domain]:277            # ...check if it's present within the error message string.278            # If it is, then it's very likely to be the record causing the279            # failure and it should be removed280            if record in error_msg:281                log.error("Removing '%s' from change list and retrying",282                          record)283                for msg in self.record_changes[domain][record]:284                    msg.failed = True285    def commit_changes_to_r53(self):286        """Attempt to make all the changes in our queue"""287        # Before committing, we should remove any duplicate changes288        self.dedupe_change_requests()289        # Run this in a loop so we can re-attempt if we get InvalidChangeBatch290        for domain in self.record_changes:291            while True:292                # Build the changebatch fresh for each loop293                changebatch = self.generate_r53_changebatch(domain)294                log.info('Sending ChangeBatch request for %s', domain)295                try:296                    self._r53_api.change_resource_record_sets(297                        HostedZoneId=self.hosted_zones[domain],298                        ChangeBatch=changebatch)299                # If we get InvalidChangeBatch, try to remove the offending300                # change, and restart the loop301                except botocore.exceptions.ClientError as e:302                    if e.response['Error']['Code'] == 'InvalidChangeBatch':303                        log.error('Route53: %s', e.message)304                        self.mark_failed_record_change(305                            domain, e.response['Error']['Message'])306                        continue307                    else:308                        raise e309                # If we run out of records to send, let's fail gracefully310                except botocore.exceptions.ParamValidationError as e:311                    if re.search(r'Invalid\slength', e.message) is not None:312                        failed = set(msg.record for msg in self.messages if (313                            msg.failed is True))314                        log.critical(315                            'No more non-failing changes to submit! ' +316                            'For reference, the records that are unable to ' +317                            'updated are:\n' + '\n'.join(failed) +318                            '\nYou may want to process them manually.')319                        break320                    else:321                        raise e322                # If we're still here, everything went well so we should break323                # out of the loop324                else:325                    break326    def delete_processed_messages(self):327        """Delete (or not) all the messages we just processed"""328        # Build a list of messages that we can delete (i.e. successfully329        # processed)330        messages_to_delete = list()331        for message in self.messages:332            if ((message.requested is True or message.duplicate is True) and333                    (message.failed is False)):334                messages_to_delete.append(message)335        log.info('Deleting %s messages (in batches of 10)',336                 len(messages_to_delete))337        # Batch delete messages 10 at a time (max per the API), removing them338        # from the list of messages to delete339        deleted_messages_count = 0340        while len(messages_to_delete) > 0:341            entries = list()342            message_count = 0343            while message_count < 10:344                try:345                    message = messages_to_delete.pop()346                except IndexError:347                    break348                else:349                    entries.append({350                        'Id': str(message_count),351                        'ReceiptHandle': message.receipt_handle352                    })353                    message_count += 1354                    deleted_messages_count += 1355            self._sqs_api.delete_messages(Entries=entries)356        log.info('Deleted %s messages from the SQS queue',357                 deleted_messages_count)358def lambda_handler(event, context):359    """360    Lambda execution handler361    https://docs.aws.amazon.com/lambda/latest/dg/362    python-programming-model-handler-types.html363    """364    # Create our class objects365    dns_change_queue = DnsChangeQueue(SQS_QUEUE_NAME)366    # Update DNS records for each message in the queue367    dns_change_queue.commit_changes_to_r53()368    # Clean up the messages we just processed...admin.py
Source:admin.py  
...30    # Override this field to change the 'target' object used for retrieving and31    # displaying history.  It should be a double__underscore__delimited32    # property related to the primary model admin object.33    papertrail_field = None34    def _record_changes(self, obj, fields=None):35        '''36        Records the state of `obj` to a JSON-serializable object, optionally37        recording only values in a list of `fields`.  If `fields` is not38        specified, all fields will be recorded.39        '''40        rec = json.loads(serializers.serialize('json', [obj]))[0]41        if fields:42            rec['fields'] = {k: v for k, v in rec['fields'].items()43                             if k in fields}44        return rec45    def _resolve_pt_object(self, object):46        instance = object47        if self.papertrail_field:48            for field_name in self.papertrail_field.split('__'):49                instance = getattr(instance, field_name, None)50                if not instance:51                    return object52        return instance53    def log_addition(self, request, object, message=None):54        if message is not None:55            super(AdminEventLoggerMixin, self).log_addition(request, object, message)56            try:57                message = json.loads(message)58            except ValueError:59                pass60        else:61            super(AdminEventLoggerMixin, self).log_addition(request, object)62        fields = self._record_changes(object)['fields']63        return papertrail.log(64            'admin-edit',65            'Created object',66            data={67                'action': 'add',68                'fields': fields,69                'message': message,70            },71            targets={72               'acting_user': request.user,73               'instance': self._resolve_pt_object(object),74            },75        )76    def log_change(self, request, object, message):77        super(AdminEventLoggerMixin, self).log_change(request, object, message)78        # construct_change_message() creates a JSON message that we load and79        # store here. (In case we don't get JSON back for some reason, still80        # store the message)81        try:82            data = {'changes': json.loads(message), 'action': 'change'}83        except ValueError:84            data = {'message': message, 'action': 'change'}85        return papertrail.log(86            'admin-edit',87            'Updated object',88            data=data,89            targets={90                'acting_user': request.user,91                'instance': self._resolve_pt_object(object),92            },93        )94    def log_deletion(self, request, object, object_repr):95        super(AdminEventLoggerMixin, self).log_deletion(request, object, object_repr)96        fields = self._record_changes(object)['fields']97        return papertrail.log(98            'admin-edit',99            'Deleted object',100            data={101                'action': 'delete',102                'fields': fields,103            },104            targets={105                'acting_user': request.user,106                'instance': self._resolve_pt_object(object),107            },108        )109    def get_urls(self):110        info = self.model._meta.app_label, self.model._meta.model_name111        urlpatterns = [112            url(r'^(.+)/papertrail/', self.admin_site.admin_view(self.view_papertrail_item), name=u'{0}_{1}_papertrail'.format(*info)),113        ] + super(AdminEventLoggerMixin, self).get_urls()114        return urlpatterns115    def get_actions(self, request):116        actions = super(AdminEventLoggerMixin, self).get_actions(request)117        func, name, desc = self.get_action('view_papertrail')118        actions[name] = (func, name, desc)119        return actions120    def view_papertrail_item(self, request, object_id, extra_context=None):121        get_object_or_404(self.model, id=object_id)122        return self.view_papertrail(request, self.model.objects.filter(id=object_id))123    def view_papertrail(self, request, queryset, extra_context=None):124        '''125        Action for displaying the papertrail for one or more selected admin126        items.127        '''128        # Map to alternate queryset if specified129        if self.papertrail_field:130            queryset = self._map_to_related_queryset(queryset, self.papertrail_field)131        action_list = Entry.objects.related_to(queryset).select_related()132        opts = queryset.model._meta133        app_label = opts.app_label134        available_type_filters = collections.OrderedDict(self.papertrail_type_filters)135        if available_type_filters:136            available_type_filters['Other'] = ('__other', )137        selected_type_filters = request.POST.getlist('papertrail-selected-type-filters')138        selected_types = list(itertools.chain.from_iterable(available_type_filters.get(k, []) for k in selected_type_filters))139        if '__other' in selected_types:140            all_types = list(itertools.chain.from_iterable(available_type_filters.values()))141            action_list = action_list.filter(Q(type__in=selected_types) | ~Q(type__in=all_types))142        elif selected_types:143            action_list = action_list.filter(type__in=selected_types)144        if queryset.count() == 1:145            obj = queryset[0]146            title = _('Paper Trail: %s') % force_text(obj)147        else:148            obj = None149            title = _('Paper Trail: %s %s') % (queryset.count(), opts.verbose_name_plural)150        context = {151            'title': title,152            'action_list': action_list,153            'module_name': capfirst(force_text(opts.verbose_name_plural)),154            'app_label': app_label,155            'opts': opts,156            'object': obj,157            'available_type_filters': available_type_filters,158            'selected_type_filters': selected_type_filters,159            'selected_actions': request.POST.getlist('_selected_action'),160        }161        context.update(extra_context or {})162        return render(request, self.papertrail_object_template or [163            "admin/%s/%s/object_papertrail.html" % (app_label, opts.object_name.lower()),164            "admin/%s/object_papertrail.html" % app_label,165            "admin/object_papertrail.html"166        ], context)167    view_papertrail.short_description = 'View Paper Trail'168    def _map_to_related_queryset(self, queryset, field):169        '''170        Given a queryset and an double__underscore__delimited field relation,171        return a list of the objects that are the target of that relation for172        all objects in the queryset.173        '''174        model = queryset.model175        # The easy part is finding the pks of the related objects176        pks = queryset.values_list('{}__pk'.format(field), flat=True)177        # The hard part is traversing the relation string and finding out what178        # model we're actually looking for.179        segments = field.split('__')180        for segment in segments:181            field = getattr(model, segment).field182            try:183                model = field.related.parent_model184            except AttributeError:185                try:186                    model = field.rel.model187                except AttributeError:188                    model = field.related_model189        # Once we have both pieces, we can just query the model for the ids190        return model.objects.filter(pk__in=pks)191    def construct_change_message(self, request, form, formsets, add=False):192        '''193        Construct a detailed change message from a changed object, including194        related objects updated via subforms.  Returns a JSON string containing195        a structure detailing the fields changed and their updated values.196        '''197        def add_related_change(changes, obj, action='change', fields=None):198            rec = self._record_changes(obj, fields=fields)199            rec['action'] = action200            changes['related_changes'].append(rec)201            return rec202        changes = {203            'action': 'change',204            'fields': self._record_changes(form.instance, form.changed_data)['fields'],205            'related_changes': [],206        }207        for formset in (formsets or []):208            for obj in formset.new_objects:209                add_related_change(changes, obj, action='add')210            for obj, changed_fields in formset.changed_objects:211                add_related_change(changes, obj, action='change', fields=changed_fields)212            for obj in formset.deleted_objects:213                add_related_change(changes, obj, action='add')214        return json.dumps(changes)215class EntryRelatedObjectInline(admin.StackedInline):216    model = EntryRelatedObject217    extra = 0218    fields = ('relation_name', 'related_content_type', 'related_model', )...record.py
Source:record.py  
1import logging2from datetime import datetime3from .db_dummy import AttributeValueNotSet4from .database import EntityDatabase5class Record:6    """7    Internal cache for database record and its updates to prevent loading whole record from database.8    Only record's attributes, which are really needed, will be loaded.9    """10    # TODO remove EntityDatabase11    def __init__(self, db: EntityDatabase, table_name, key):12        self._db_connection = db13        self._log = logging.getLogger("Record")14        self.table_name = table_name15        self.key = key16        # record structure17        self._record = {}18        # structure, which will hold all attributes, which were updated, all their updated values19        self._record_changes = {}20        self.exists_in_db = self.exists()21    def _load_from_db(self, attrib_name):22        """23        Loads attribute from database into cache.24        WARNING: If the attribute has no value set in database yet, the cache remains the same25        :param attrib_name: attribute's name26        :return: None27        """28        try:29            self._record[attrib_name] = self._db_connection.get_attrib(self.table_name, self.key, attrib_name)30        except AttributeValueNotSet: # TODO: the EntityDatabase.get_attrib doesn't raise exception like this31            pass32    def __getitem__(self, attrib_name):33        """34        Overrides functionality, when "Record[key]" is called. Gets value from record.35        :param attrib_name: key to value, which wants to be obtained from record36        :return: value, which is saved under the key in record37        :raise KeyError when the attrib_name does not exist in database under in the record38        """39        value = self._record.get(attrib_name)40        # if attribute is not loaded into cache yet, load it from database41        if value is None:42            self._load_from_db(attrib_name)43        return self._record[attrib_name]44    def __setitem__(self, attrib_name, value):45        """46        Overrides functionality, when "Record[key] = value" is called. Sets value in record.47        :param attrib_name: attribute's name in record48        :param value: new value under the key49        :return: None50        """51        self._record[attrib_name] = value52        # cache the changes53        self._record_changes[attrib_name] = value54    def __contains__(self, attrib_name):55        """56        Overrides functionality, when "key in Record" is called.57        :param attrib_name: key name58        :return: True if attribute is in record and it's value is not None, False otherwise59        """60        if attrib_name in self._record:61            return True62        # if attribute not in cache, try to load it from database (if does not exist in database, cache will not be63        # updated)64        self._load_from_db(attrib_name)65        return attrib_name in self._record and self._record[attrib_name] is not None66    def __delitem__(self, attrib_name):67        """68        Overrides functionality, when "del Record[key]" is called. Deletes attribute from record.69        :param attrib_name: attribute's name, which should be deleted70        :return: None71        """72        try:73            del self._record[attrib_name]74        except KeyError:75            pass76        self._db_connection.delete_attribute(self.table_name, self.key, attrib_name)77    def exists(self):78        """79        Checks, whether record exists in the database.80        :return: True if exists record exists in the database, False otherwise81        """82        return self._db_connection.exists(self.table_name, self.key)83    def update(self, dict_update):84        """85        Behaves like classic dict.update()86        """87        self._record.update(dict_update)88        self._record_changes.update(dict_update)89    def get(self, attrib_name, default_val=None, load_from_db=True):90        """91        Behaves same as Dict's get, but can request loading record from database with 'load_from_db' flag, if not cached92        """93        value = self._record.get(attrib_name)94        if value is None and load_from_db:95            self._load_from_db(attrib_name)96        return self._record.get(attrib_name, default_val)97    def push_changes_to_db(self):98        """99        Send all updates to database.100        :return: None101        """102        if self._record_changes:103            self['ts_last_update'] = datetime.utcnow()104            if self.exists_in_db:105                self._db_connection.update_record(self.table_name, self.key, self._record_changes)106            else:107                # if new record get created, _record and _record_changes should contain same data108                self._db_connection.create_record(self.table_name, self.key, self._record)109            # reset record changes, because they were pushed to database, but leave record cache, it may be still needed110            # (but probably won't)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
