How to use put_object_legal_hold method in localstack

Best Python code snippet using localstack_python

test_s3_object_lock_x.py

Source:test_s3_object_lock_x.py Github

copy

Full Screen

...503 client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)504 key = 'file1'505 client.put_object(Bucket=bucket_name, Body='abc', Key=key)506 legal_hold = {'Status': 'ON'}507 response = client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold=legal_hold)508 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)509 response = client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status': 'OFF'})510 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)511 def test_object_lock_put_legal_hold_invalid_bucket(self, s3cfg_global_unique):512 """513 (operation='Test put legal hold with invalid bucket')514 (assertion='fails')515 """516 client = get_client(s3cfg_global_unique)517 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)518 client.create_bucket(Bucket=bucket_name)519 key = 'file1'520 client.put_object(Bucket=bucket_name, Body='abc', Key=key)521 legal_hold = {'Status': 'ON'}522 e = assert_raises(ClientError, client.put_object_legal_hold, Bucket=bucket_name, Key=key, LegalHold=legal_hold)523 status, error_code = self.get_status_and_error_code(e.response)524 self.eq(status, 400)525 self.eq(error_code, 'InvalidRequest')526 def test_object_lock_put_legal_hold_invalid_status(self, s3cfg_global_unique):527 """528 (operation='Test put legal hold with invalid status')529 (assertion='fails')530 """531 client = get_client(s3cfg_global_unique)532 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)533 client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)534 key = 'file1'535 client.put_object(Bucket=bucket_name, Body='abc', Key=key)536 legal_hold = {'Status': 'abc'}537 e = assert_raises(ClientError, client.put_object_legal_hold, Bucket=bucket_name, Key=key, LegalHold=legal_hold)538 status, error_code = self.get_status_and_error_code(e.response)539 self.eq(status, 400)540 self.eq(error_code, 'MalformedXML')541 def test_object_lock_get_legal_hold(self, s3cfg_global_unique):542 """543 (operation='Test get legal hold')544 (assertion='success')545 """546 client = get_client(s3cfg_global_unique)547 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)548 client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)549 key = 'file1'550 client.put_object(Bucket=bucket_name, Body='abc', Key=key)551 legal_hold = {'Status': 'ON'}552 client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold=legal_hold)553 response = client.get_object_legal_hold(Bucket=bucket_name, Key=key)554 self.eq(response['LegalHold'], legal_hold)555 legal_hold_off = {'Status': 'OFF'}556 client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold=legal_hold_off)557 response = client.get_object_legal_hold(Bucket=bucket_name, Key=key)558 self.eq(response['LegalHold'], legal_hold_off)559 def test_object_lock_get_legal_hold_invalid_bucket(self, s3cfg_global_unique):560 """561 (operation='Test get legal hold with invalid bucket')562 (assertion='fails')563 """564 client = get_client(s3cfg_global_unique)565 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)566 client.create_bucket(Bucket=bucket_name)567 key = 'file1'568 client.put_object(Bucket=bucket_name, Body='abc', Key=key)569 e = assert_raises(ClientError, client.get_object_legal_hold, Bucket=bucket_name, Key=key)570 status, error_code = self.get_status_and_error_code(e.response)571 self.eq(status, 400)572 self.eq(error_code, 'InvalidRequest')573 def test_object_lock_delete_object_with_legal_hold_on(self, s3cfg_global_unique):574 """575 (operation='Test delete object with legal hold on')576 (assertion='fails')577 """578 client = get_client(s3cfg_global_unique)579 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)580 client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)581 key = 'file1'582 response = client.put_object(Bucket=bucket_name, Body='abc', Key=key)583 client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status': 'ON'})584 e = assert_raises(ClientError, client.delete_object, Bucket=bucket_name, Key=key,585 VersionId=response['VersionId'])586 status, error_code = self.get_status_and_error_code(e.response)587 self.eq(status, 403)588 self.eq(error_code, 'AccessDenied')589 client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status': 'OFF'})590 def test_object_lock_delete_object_with_legal_hold_off(self, s3cfg_global_unique):591 """592 (operation='Test delete object with legal hold off')593 (assertion='fails')594 """595 client = get_client(s3cfg_global_unique)596 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)597 client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)598 key = 'file1'599 response = client.put_object(Bucket=bucket_name, Body='abc', Key=key)600 client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status': 'OFF'})601 response = client.delete_object(Bucket=bucket_name, Key=key, VersionId=response['VersionId'])602 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 204)603 def test_object_lock_get_obj_metadata(self, s3cfg_global_unique):604 """605 (operation='Test get object metadata')606 (assertion='success')607 """608 client = get_client(s3cfg_global_unique)609 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)610 client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)611 key = 'file1'612 client.put_object(Bucket=bucket_name, Body='abc', Key=key)613 legal_hold = {'Status': 'ON'}614 client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold=legal_hold)615 retention = {'Mode': 'GOVERNANCE', 'RetainUntilDate': datetime.datetime(2030, 1, 1, tzinfo=pytz.UTC)}616 client.put_object_retention(Bucket=bucket_name, Key=key, Retention=retention)617 response = client.head_object(Bucket=bucket_name, Key=key)618 self.eq(response['ObjectLockMode'], retention['Mode'])619 self.eq(response['ObjectLockRetainUntilDate'], retention['RetainUntilDate'])620 self.eq(response['ObjectLockLegalHoldStatus'], legal_hold['Status'])621 client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status': 'OFF'})622 client.delete_object(Bucket=bucket_name, Key=key, VersionId=response['VersionId'],623 BypassGovernanceRetention=True)624 def test_object_lock_uploading_obj(self, s3cfg_global_unique):625 """626 (operation='Test put legal hold and retention when uploading object')627 (assertion='success')628 """629 client = get_client(s3cfg_global_unique)630 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)631 client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)632 key = 'file1'633 client.put_object(Bucket=bucket_name, Body='abc', Key=key, ObjectLockMode='GOVERNANCE',634 ObjectLockRetainUntilDate=datetime.datetime(2030, 1, 1, tzinfo=pytz.UTC),635 ObjectLockLegalHoldStatus='ON')636 response = client.head_object(Bucket=bucket_name, Key=key)637 self.eq(response['ObjectLockMode'], 'GOVERNANCE')638 self.eq(response['ObjectLockRetainUntilDate'], datetime.datetime(2030, 1, 1, tzinfo=pytz.UTC))639 self.eq(response['ObjectLockLegalHoldStatus'], 'ON')640 client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status': 'OFF'})641 client.delete_object(Bucket=bucket_name, Key=key, VersionId=response['VersionId'],642 BypassGovernanceRetention=True)643 def test_object_lock_changing_mode_from_governance_with_bypass(self, s3cfg_global_unique):644 """645 (operation='Test changing object retention mode from GOVERNANCE to COMPLIANCE with bypass')646 (assertion='succeeds')647 """648 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)649 client = get_client(s3cfg_global_unique)650 key = 'file1'651 client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)652 # upload object with mode=GOVERNANCE653 retain_until = datetime.datetime.now(pytz.utc) + datetime.timedelta(seconds=10)654 client.put_object(Bucket=bucket_name, Body='abc', Key=key, ObjectLockMode='GOVERNANCE',...

Full Screen

Full Screen

train.py

Source:train.py Github

copy

Full Screen

...46 # obtain object lock and release it when we successfully upload to S347 try:48 s3 = boto3.client('s3')49 if event['objectLock']:50 s3.put_object_legal_hold(51 Bucket=__S3_BUCKET_NAME,52 Key=__MODEL_FILENAME,53 LegalHold={54 'Status' : 'ON',55 },56 )57 print(f"mutex lock on model.h5 in S3 obtained!")58 # write file after obtaining lock59 with open('/tmp/model.h5', 'rb') as data:60 s3.upload_fileobj(data, __S3_BUCKET_NAME, __MODEL_FILENAME)61 isUploaded = True62 print(f"successfully uploaded model to S3")63 if event['objectLock']:64 s3.put_object_legal_hold(65 Bucket=__S3_BUCKET_NAME,66 Key=__MODEL_FILENAME,67 LegalHold={68 'Status' : 'OFF',69 },70 )71 print(f"mutex lock on model.h5 in S3 released!")72 clean_local_temp_folder()73 except Exception as e:74 raise e75 return {76 "statusCode" : 200,77 "headers" : {78 "Content-Type" : "application/json"...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful