How to use cancel_import_task method in localstack

Best Python code snippet using localstack_python

aws_snapshot.py

Source:aws_snapshot.py Github

copy

Full Screen

...37 def clean_up(self):38 """Delete cloud artifacts created during the life-time of this object."""39 try:40 # Cancel pending import-task.41 self.cancel_import_task()42 except ClientError as client_error:43 # Log the underlying error without propagating it further.44 LOGGER.exception(client_error)45 try:46 # Delete the snapshot.47 self.delete_snapshot()48 except ClientError as client_error:49 # Log the underlying error without propagating it further.50 LOGGER.exception(client_error)51 # Always reset to None to avoid re-running the code.52 self.snapshot_id = None53 self.import_task_id = None54 def create_snapshot(self):55 """Creates a snapshot from the uploaded s3_disk."""56 try:57 description = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + '--BIGIP-Volume-From-'58 description += self.s3_disk59 LOGGER.info("Importing the disk [s3://%s/%s] as a snapshot in AWS.",60 self.s3_bucket, self.s3_disk)61 response = self.ec2_client.import_snapshot(Description=description,62 DiskContainer={63 "Description": description,64 "Format": "vmdk",65 "UserBucket": {66 "S3Bucket": self.s3_bucket,67 "S3Key": self.s3_disk68 }69 })70 LOGGER.trace("import_snapshot() Response => '%s'", response)71 self.import_task_id = response['ImportTaskId']72 LOGGER.info("TaskId for the import_snapshot() operation => [%s]",73 self.import_task_id)74 # Wait for the snapshot import to complete.75 self.is_snapshot_ready(self.import_task_id)76 # As the import operation successfully completed, reset it back to None77 # to avoid trying to cancel a completed import-task during clean-up.78 self.import_task_id = None79 # Tag the snapshot80 self.create_tags()81 except RuntimeError as runtime_error:82 LOGGER.exception(runtime_error)83 raise84 def is_snapshot_ready(self, import_task_id):85 """Checks if a snapshot with the given import_task_id exists and its86 status is 'completed'."""87 def _is_snapshot_ready():88 """Awaits the import operation represented by the import_task_id to reach89 'completed' status."""90 try:91 LOGGER.trace("Querying the status of import-task [%s].", import_task_id)92 response = \93 self.ec2_client.describe_import_snapshot_tasks(94 ImportTaskIds=[import_task_id])95 if not response:96 raise RuntimeError("describe_import_snapshot_tasks() returned none response!")97 LOGGER.trace("Response from describe_import_snapshot_tasks => '%s'",98 response)99 task_status = response['ImportSnapshotTasks'][0]['SnapshotTaskDetail']['Status']100 if task_status == 'error':101 # Print the response before raising an exception.102 LOGGER.debug("describe_import_snapshot_tasks() response for [%s] => [%s]",103 import_task_id, response)104 raise RuntimeError("import-snapshot task [{}] in unrecoverable 'error' state.".105 format(import_task_id))106 return task_status == 'completed'107 except ClientError as client_error:108 LOGGER.exception(client_error)109 raise RuntimeError("describe_import_snapshot_tasks() failed for [{}]!".110 format(import_task_id)) from client_error111 retrier = Retrier(_is_snapshot_ready)112 retrier.tries = int(get_config_value('AWS_IMPORT_SNAPSHOT_TASK_RETRY_COUNT'))113 retrier.delay = int(get_config_value('AWS_IMPORT_SNAPSHOT_TASK_RETRY_DELAY'))114 LOGGER.info("Waiting for the import snapshot task [%s] to complete.", import_task_id)115 try:116 if retrier.execute():117 LOGGER.info("import_snapshot_task [%s] is completed.", import_task_id)118 # Call it one last time to get the snapshot_id.119 response = \120 self.ec2_client.describe_import_snapshot_tasks(121 ImportTaskIds=[import_task_id])122 self.snapshot_id = \123 response['ImportSnapshotTasks'][0]['SnapshotTaskDetail']['SnapshotId']124 LOGGER.info("SnapshotID = [%s].", self.snapshot_id)125 return True126 LOGGER.warning("import_snapshot_task [%s] didn't complete after checking [%d] times!",127 import_task_id, retrier.tries)128 return False129 except RuntimeError as runtime_exception:130 LOGGER.exception(runtime_exception)131 raise132 def get_snapshot_tag_metadata(self):133 """Returns associated snapshot metadata tags."""134 metadata_tags = CloudImageTags(self.metadata)135 return metadata_tags.get()136 def create_tags(self):137 """ Create tags for snapshot. Tags are fetched from metadata. """138 snapshot_tags = self.get_snapshot_tag_metadata()139 tags_to_add = []140 for tag in snapshot_tags:141 tags_to_add.append({'Key': tag, 'Value': snapshot_tags[tag]})142 try:143 response = self.ec2_client.create_tags(Resources=[self.snapshot_id], Tags=tags_to_add)144 except (ClientError, ParamValidationError) as botocore_exception:145 LOGGER.exception(botocore_exception)146 raise RuntimeError('create_tags failed for snapshot\'{}\'!\n'147 .format(self.snapshot_id)) from botocore_exception148 LOGGER.trace('create_tags response for snapshot %s: %s', self.snapshot_id, response)149 def delete_snapshot(self):150 """Delete the AWS snapshot created by this object."""151 if self.snapshot_id is not None:152 LOGGER.info("Deleting the snapshot '%s'.", self.snapshot_id)153 self.ec2_client.delete_snapshot(SnapshotId=self.snapshot_id)154 LOGGER.info("Successfully deleted snapshot '%s'.", self.snapshot_id)155 def cancel_import_task(self):156 """Cancel an on-going import task as represented by the self.import_task_id.157 As per AWS, this only works on "pending" import tasks. For a completed task158 this would essentially be a NO-OP."""159 if self.import_task_id is not None:160 LOGGER.info("Cancelling pending import task '%s'.", self.import_task_id)161 self.ec2_client.cancel_import_task(ImportTaskId=self.import_task_id)...

Full Screen

Full Screen

tables.py

Source:tables.py Github

copy

Full Screen

...53 count54 )55 def action(self, request, obj_id):56 if obj_id.startswith("import"):57 transport.cancel_import_task(request, obj_id)58 elif obj_id.startswith("export"):59 transport.cancel_export_task(request, obj_id)60 else:61 pass62class InstancesFilterAction(tables.FilterAction):63 filter_type = "query"64 filter_choices = (('type', _("Task Type ="), True),65 ('id', _("Task ID ="), True),66 ('instance_id', _("Instance ID ="), True),67 ('instance_name', _("Instance Name ="), True),68 ('status_message', _("Status Message ="), True),69 ('state', _("State ="), True))70class TransportTaskTable(tables.DataTable):71 type = tables.WrappingColumn("type",...

Full Screen

Full Screen

transport.py

Source:transport.py Github

copy

Full Screen

...55 response = ec2_client(request).describe_import_image_tasks(ImportTaskIds=[task_id])56 return ImportTask(response.get('ImportImageTasks')[0])57def cancel_export_task(request, task_id):58 ec2_client(request).cancel_export_task(ExportTaskIds=[task_id])59def cancel_import_task(request, task_id):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful