Best Python code snippet using localstack_python
test_s3_object_lock_x.py
Source:test_s3_object_lock_x.py  
...503        client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)504        key = 'file1'505        client.put_object(Bucket=bucket_name, Body='abc', Key=key)506        legal_hold = {'Status': 'ON'}507        response = client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold=legal_hold)508        self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)509        response = client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status': 'OFF'})510        self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)511    def test_object_lock_put_legal_hold_invalid_bucket(self, s3cfg_global_unique):512        """513        (operation='Test put legal hold with invalid bucket')514        (assertion='fails')515        """516        client = get_client(s3cfg_global_unique)517        bucket_name = self.get_new_bucket_name(s3cfg_global_unique)518        client.create_bucket(Bucket=bucket_name)519        key = 'file1'520        client.put_object(Bucket=bucket_name, Body='abc', Key=key)521        legal_hold = {'Status': 'ON'}522        e = assert_raises(ClientError, client.put_object_legal_hold, Bucket=bucket_name, Key=key, LegalHold=legal_hold)523        status, error_code = self.get_status_and_error_code(e.response)524        self.eq(status, 400)525        self.eq(error_code, 'InvalidRequest')526    def test_object_lock_put_legal_hold_invalid_status(self, s3cfg_global_unique):527        """528        (operation='Test put legal hold with invalid status')529        (assertion='fails')530        """531        client = get_client(s3cfg_global_unique)532        bucket_name = self.get_new_bucket_name(s3cfg_global_unique)533        client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)534        key = 'file1'535        client.put_object(Bucket=bucket_name, Body='abc', Key=key)536        legal_hold = {'Status': 'abc'}537        e = assert_raises(ClientError, client.put_object_legal_hold, Bucket=bucket_name, Key=key, LegalHold=legal_hold)538        status, error_code = self.get_status_and_error_code(e.response)539        self.eq(status, 400)540        self.eq(error_code, 'MalformedXML')541    def test_object_lock_get_legal_hold(self, s3cfg_global_unique):542        """543        (operation='Test get legal hold')544        (assertion='success')545        """546        client = get_client(s3cfg_global_unique)547        bucket_name = self.get_new_bucket_name(s3cfg_global_unique)548        client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)549        key = 'file1'550        client.put_object(Bucket=bucket_name, Body='abc', Key=key)551        legal_hold = {'Status': 'ON'}552        client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold=legal_hold)553        response = client.get_object_legal_hold(Bucket=bucket_name, Key=key)554        self.eq(response['LegalHold'], legal_hold)555        legal_hold_off = {'Status': 'OFF'}556        client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold=legal_hold_off)557        response = client.get_object_legal_hold(Bucket=bucket_name, Key=key)558        self.eq(response['LegalHold'], legal_hold_off)559    def test_object_lock_get_legal_hold_invalid_bucket(self, s3cfg_global_unique):560        """561        (operation='Test get legal hold with invalid bucket')562        (assertion='fails')563        """564        client = get_client(s3cfg_global_unique)565        bucket_name = self.get_new_bucket_name(s3cfg_global_unique)566        client.create_bucket(Bucket=bucket_name)567        key = 'file1'568        client.put_object(Bucket=bucket_name, Body='abc', Key=key)569        e = assert_raises(ClientError, client.get_object_legal_hold, Bucket=bucket_name, Key=key)570        status, error_code = self.get_status_and_error_code(e.response)571        self.eq(status, 400)572        self.eq(error_code, 'InvalidRequest')573    def test_object_lock_delete_object_with_legal_hold_on(self, s3cfg_global_unique):574        """575        (operation='Test delete object with legal hold on')576        (assertion='fails')577        """578        client = get_client(s3cfg_global_unique)579        bucket_name = self.get_new_bucket_name(s3cfg_global_unique)580        client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)581        key = 'file1'582        response = client.put_object(Bucket=bucket_name, Body='abc', Key=key)583        client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status': 'ON'})584        e = assert_raises(ClientError, client.delete_object, Bucket=bucket_name, Key=key,585                          VersionId=response['VersionId'])586        status, error_code = self.get_status_and_error_code(e.response)587        self.eq(status, 403)588        self.eq(error_code, 'AccessDenied')589        client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status': 'OFF'})590    def test_object_lock_delete_object_with_legal_hold_off(self, s3cfg_global_unique):591        """592        (operation='Test delete object with legal hold off')593        (assertion='fails')594        """595        client = get_client(s3cfg_global_unique)596        bucket_name = self.get_new_bucket_name(s3cfg_global_unique)597        client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)598        key = 'file1'599        response = client.put_object(Bucket=bucket_name, Body='abc', Key=key)600        client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status': 'OFF'})601        response = client.delete_object(Bucket=bucket_name, Key=key, VersionId=response['VersionId'])602        self.eq(response['ResponseMetadata']['HTTPStatusCode'], 204)603    def test_object_lock_get_obj_metadata(self, s3cfg_global_unique):604        """605        (operation='Test get object metadata')606        (assertion='success')607        """608        client = get_client(s3cfg_global_unique)609        bucket_name = self.get_new_bucket_name(s3cfg_global_unique)610        client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)611        key = 'file1'612        client.put_object(Bucket=bucket_name, Body='abc', Key=key)613        legal_hold = {'Status': 'ON'}614        client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold=legal_hold)615        retention = {'Mode': 'GOVERNANCE', 'RetainUntilDate': datetime.datetime(2030, 1, 1, tzinfo=pytz.UTC)}616        client.put_object_retention(Bucket=bucket_name, Key=key, Retention=retention)617        response = client.head_object(Bucket=bucket_name, Key=key)618        self.eq(response['ObjectLockMode'], retention['Mode'])619        self.eq(response['ObjectLockRetainUntilDate'], retention['RetainUntilDate'])620        self.eq(response['ObjectLockLegalHoldStatus'], legal_hold['Status'])621        client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status': 'OFF'})622        client.delete_object(Bucket=bucket_name, Key=key, VersionId=response['VersionId'],623                             BypassGovernanceRetention=True)624    def test_object_lock_uploading_obj(self, s3cfg_global_unique):625        """626        (operation='Test put legal hold and retention when uploading object')627        (assertion='success')628        """629        client = get_client(s3cfg_global_unique)630        bucket_name = self.get_new_bucket_name(s3cfg_global_unique)631        client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)632        key = 'file1'633        client.put_object(Bucket=bucket_name, Body='abc', Key=key, ObjectLockMode='GOVERNANCE',634                          ObjectLockRetainUntilDate=datetime.datetime(2030, 1, 1, tzinfo=pytz.UTC),635                          ObjectLockLegalHoldStatus='ON')636        response = client.head_object(Bucket=bucket_name, Key=key)637        self.eq(response['ObjectLockMode'], 'GOVERNANCE')638        self.eq(response['ObjectLockRetainUntilDate'], datetime.datetime(2030, 1, 1, tzinfo=pytz.UTC))639        self.eq(response['ObjectLockLegalHoldStatus'], 'ON')640        client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status': 'OFF'})641        client.delete_object(Bucket=bucket_name, Key=key, VersionId=response['VersionId'],642                             BypassGovernanceRetention=True)643    def test_object_lock_changing_mode_from_governance_with_bypass(self, s3cfg_global_unique):644        """645        (operation='Test changing object retention mode from GOVERNANCE to COMPLIANCE with bypass')646        (assertion='succeeds')647        """648        bucket_name = self.get_new_bucket_name(s3cfg_global_unique)649        client = get_client(s3cfg_global_unique)650        key = 'file1'651        client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)652        # upload object with mode=GOVERNANCE653        retain_until = datetime.datetime.now(pytz.utc) + datetime.timedelta(seconds=10)654        client.put_object(Bucket=bucket_name, Body='abc', Key=key, ObjectLockMode='GOVERNANCE',...train.py
Source:train.py  
...46    # obtain object lock and release it when we successfully upload to S347    try:48        s3 = boto3.client('s3')49        if event['objectLock']:50            s3.put_object_legal_hold(51                Bucket=__S3_BUCKET_NAME,52                Key=__MODEL_FILENAME,53                LegalHold={54                    'Status' : 'ON',55                },56            )57        print(f"mutex lock on model.h5 in S3 obtained!")58        # write file after obtaining lock59        with open('/tmp/model.h5', 'rb') as data:60            s3.upload_fileobj(data, __S3_BUCKET_NAME, __MODEL_FILENAME)61        isUploaded = True62        print(f"successfully uploaded model to S3")63        if event['objectLock']:64            s3.put_object_legal_hold(65                Bucket=__S3_BUCKET_NAME,66                Key=__MODEL_FILENAME,67                LegalHold={68                    'Status' : 'OFF',69                },70            )71        print(f"mutex lock on model.h5 in S3 released!")72        clean_local_temp_folder()73    except Exception as e:74        raise e75    return {76        "statusCode" : 200,77        "headers" : {78            "Content-Type" : "application/json"...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
