Best Python code snippet using localstack_python
aws_snapshot.py
Source:aws_snapshot.py  
...37    def clean_up(self):38        """Delete cloud artifacts created during the life-time of this object."""39        try:40            # Cancel pending import-task.41            self.cancel_import_task()42        except ClientError as client_error:43            # Log the underlying error without propagating it further.44            LOGGER.exception(client_error)45        try:46            # Delete the snapshot.47            self.delete_snapshot()48        except ClientError as client_error:49            # Log the underlying error without propagating it further.50            LOGGER.exception(client_error)51        # Always reset to None to avoid re-running the code.52        self.snapshot_id = None53        self.import_task_id = None54    def create_snapshot(self):55        """Creates a snapshot from the uploaded s3_disk."""56        try:57            description = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + '--BIGIP-Volume-From-'58            description += self.s3_disk59            LOGGER.info("Importing the disk [s3://%s/%s] as a snapshot in AWS.",60                        self.s3_bucket, self.s3_disk)61            response = self.ec2_client.import_snapshot(Description=description,62                                                       DiskContainer={63                                                           "Description": description,64                                                           "Format": "vmdk",65                                                           "UserBucket": {66                                                               "S3Bucket": self.s3_bucket,67                                                               "S3Key": self.s3_disk68                                                           }69                                                       })70            LOGGER.trace("import_snapshot() Response => '%s'", response)71            self.import_task_id = response['ImportTaskId']72            LOGGER.info("TaskId for the import_snapshot() operation  => [%s]",73                        self.import_task_id)74            # Wait for the snapshot import to complete.75            self.is_snapshot_ready(self.import_task_id)76            # As the import operation successfully completed, reset it back to None77            # to avoid trying to cancel a completed import-task during clean-up.78            self.import_task_id = None79            # Tag the snapshot80            self.create_tags()81        except RuntimeError as runtime_error:82            LOGGER.exception(runtime_error)83            raise84    def is_snapshot_ready(self, import_task_id):85        """Checks if a snapshot with the given import_task_id exists and its86        status is 'completed'."""87        def _is_snapshot_ready():88            """Awaits the import operation represented by the import_task_id to reach89            'completed' status."""90            try:91                LOGGER.trace("Querying the status of import-task [%s].", import_task_id)92                response = \93                    self.ec2_client.describe_import_snapshot_tasks(94                        ImportTaskIds=[import_task_id])95                if not response:96                    raise RuntimeError("describe_import_snapshot_tasks() returned none response!")97                LOGGER.trace("Response from describe_import_snapshot_tasks => '%s'",98                             response)99                task_status = response['ImportSnapshotTasks'][0]['SnapshotTaskDetail']['Status']100                if task_status == 'error':101                    # Print the response before raising an exception.102                    LOGGER.debug("describe_import_snapshot_tasks() response for [%s] => [%s]",103                                 import_task_id, response)104                    raise RuntimeError("import-snapshot task [{}] in unrecoverable 'error' state.".105                                       format(import_task_id))106                return task_status == 'completed'107            except ClientError as client_error:108                LOGGER.exception(client_error)109                raise RuntimeError("describe_import_snapshot_tasks() failed for [{}]!".110                                   format(import_task_id)) from client_error111        retrier = Retrier(_is_snapshot_ready)112        retrier.tries = int(get_config_value('AWS_IMPORT_SNAPSHOT_TASK_RETRY_COUNT'))113        retrier.delay = int(get_config_value('AWS_IMPORT_SNAPSHOT_TASK_RETRY_DELAY'))114        LOGGER.info("Waiting for the import snapshot task [%s] to complete.", import_task_id)115        try:116            if retrier.execute():117                LOGGER.info("import_snapshot_task [%s] is completed.", import_task_id)118                # Call it one last time to get the snapshot_id.119                response = \120                self.ec2_client.describe_import_snapshot_tasks(121                    ImportTaskIds=[import_task_id])122                self.snapshot_id = \123                    response['ImportSnapshotTasks'][0]['SnapshotTaskDetail']['SnapshotId']124                LOGGER.info("SnapshotID = [%s].", self.snapshot_id)125                return True126            LOGGER.warning("import_snapshot_task [%s] didn't complete after checking [%d] times!",127                           import_task_id, retrier.tries)128            return False129        except RuntimeError as runtime_exception:130            LOGGER.exception(runtime_exception)131            raise132    def get_snapshot_tag_metadata(self):133        """Returns associated snapshot metadata tags."""134        metadata_tags = CloudImageTags(self.metadata)135        return metadata_tags.get()136    def create_tags(self):137        """ Create tags for snapshot. Tags are fetched from metadata. """138        snapshot_tags = self.get_snapshot_tag_metadata()139        tags_to_add = []140        for tag in snapshot_tags:141            tags_to_add.append({'Key': tag, 'Value': snapshot_tags[tag]})142        try:143            response = self.ec2_client.create_tags(Resources=[self.snapshot_id], Tags=tags_to_add)144        except (ClientError, ParamValidationError) as botocore_exception:145            LOGGER.exception(botocore_exception)146            raise RuntimeError('create_tags failed for snapshot\'{}\'!\n'147                               .format(self.snapshot_id)) from botocore_exception148        LOGGER.trace('create_tags response for snapshot %s: %s', self.snapshot_id, response)149    def delete_snapshot(self):150        """Delete the AWS snapshot created by this object."""151        if self.snapshot_id is not None:152            LOGGER.info("Deleting the snapshot '%s'.", self.snapshot_id)153            self.ec2_client.delete_snapshot(SnapshotId=self.snapshot_id)154            LOGGER.info("Successfully deleted snapshot '%s'.", self.snapshot_id)155    def cancel_import_task(self):156        """Cancel an on-going import task as represented by the self.import_task_id.157        As per AWS, this only works on "pending" import tasks. For a completed task158        this would essentially be a NO-OP."""159        if self.import_task_id is not None:160            LOGGER.info("Cancelling pending import task '%s'.", self.import_task_id)161            self.ec2_client.cancel_import_task(ImportTaskId=self.import_task_id)...tables.py
Source:tables.py  
...53            count54        )55    def action(self, request, obj_id):56        if obj_id.startswith("import"):57            transport.cancel_import_task(request, obj_id)58        elif obj_id.startswith("export"):59            transport.cancel_export_task(request, obj_id)60        else:61            pass62class InstancesFilterAction(tables.FilterAction):63    filter_type = "query"64    filter_choices = (('type', _("Task Type ="), True),65                      ('id', _("Task ID ="), True),66                      ('instance_id', _("Instance ID ="), True),67                      ('instance_name', _("Instance Name ="), True),68                      ('status_message', _("Status Message ="), True),69                      ('state', _("State ="), True))70class TransportTaskTable(tables.DataTable):71    type = tables.WrappingColumn("type",...transport.py
Source:transport.py  
...55    response = ec2_client(request).describe_import_image_tasks(ImportTaskIds=[task_id])56    return ImportTask(response.get('ImportImageTasks')[0])57def cancel_export_task(request, task_id):58    ec2_client(request).cancel_export_task(ExportTaskIds=[task_id])59def cancel_import_task(request, task_id):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
