How to use put_object_lock_configuration method in localstack

Best Python code snippet using localstack_python

test_s3_object_lock_x.py

Source:test_s3_object_lock_x.py Github

copy

Full Screen

...18 'Mode': 'GOVERNANCE',19 'Days': 120 }21 }}22 response = client.put_object_lock_configuration(23 Bucket=bucket_name,24 ObjectLockConfiguration=conf)25 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)26 conf = {'ObjectLockEnabled': 'Enabled',27 'Rule': {28 'DefaultRetention': {29 'Mode': 'COMPLIANCE',30 'Years': 131 }32 }}33 response = client.put_object_lock_configuration(34 Bucket=bucket_name,35 ObjectLockConfiguration=conf)36 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)37 response = client.get_bucket_versioning(Bucket=bucket_name) # ?38 self.eq(response['Status'], 'Enabled')39 def test_object_lock_put_obj_lock_invalid_bucket(self, s3cfg_global_unique):40 """41 (operation='Test put object lock with bucket object lock not enabled')42 (assertion='fails')43 """44 client = get_client(s3cfg_global_unique)45 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)46 client.create_bucket(Bucket=bucket_name)47 conf = {'ObjectLockEnabled': 'Enabled',48 'Rule': {49 'DefaultRetention': {50 'Mode': 'GOVERNANCE',51 'Days': 152 }53 }}54 e = assert_raises(ClientError, client.put_object_lock_configuration, Bucket=bucket_name,55 ObjectLockConfiguration=conf)56 status, error_code = self.get_status_and_error_code(e.response)57 self.eq(status, 409)58 self.eq(error_code, 'InvalidBucketState')59 def test_object_lock_put_obj_lock_with_days_and_years(self, s3cfg_global_unique):60 """61 (operation='Test put object lock with days and years')62 (assertion='fails')63 """64 client = get_client(s3cfg_global_unique)65 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)66 client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)67 conf = {'ObjectLockEnabled': 'Enabled',68 'Rule': {69 'DefaultRetention': {70 'Mode': 'GOVERNANCE',71 'Days': 1,72 'Years': 173 }74 }}75 e = assert_raises(ClientError, client.put_object_lock_configuration, Bucket=bucket_name,76 ObjectLockConfiguration=conf)77 status, error_code = self.get_status_and_error_code(e.response)78 self.eq(status, 400)79 self.eq(error_code, 'MalformedXML')80 def test_object_lock_put_obj_lock_invalid_days(self, s3cfg_global_unique):81 """82 (operation='Test put object lock with invalid days')83 (assertion='fails')84 """85 client = get_client(s3cfg_global_unique)86 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)87 client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)88 conf = {'ObjectLockEnabled': 'Enabled',89 'Rule': {90 'DefaultRetention': {91 'Mode': 'GOVERNANCE',92 'Days': 093 }94 }}95 e = assert_raises(ClientError, client.put_object_lock_configuration, Bucket=bucket_name,96 ObjectLockConfiguration=conf)97 status, error_code = self.get_status_and_error_code(e.response)98 self.eq(status, 400)99 self.eq(error_code, 'InvalidRetentionPeriod')100 def test_object_lock_put_obj_lock_invalid_years1(self, s3cfg_global_unique):101 """102 (operation='Test put object lock with invalid years')103 (assertion='fails')104 """105 client = get_client(s3cfg_global_unique)106 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)107 client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)108 conf = {'ObjectLockEnabled': 'Enabled',109 'Rule': {110 'DefaultRetention': {111 'Mode': 'GOVERNANCE',112 'Years': -1113 }114 }}115 e = assert_raises(ClientError, client.put_object_lock_configuration, Bucket=bucket_name,116 ObjectLockConfiguration=conf)117 status, error_code = self.get_status_and_error_code(e.response)118 self.eq(status, 400)119 self.eq(error_code, 'InvalidRetentionPeriod')120 def test_object_lock_put_obj_lock_invalid_years2(self, s3cfg_global_unique):121 """122 (operation='Test put object lock with invalid mode')123 (assertion='fails')124 """125 client = get_client(s3cfg_global_unique)126 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)127 client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)128 conf = {'ObjectLockEnabled': 'Enabled',129 'Rule': {130 'DefaultRetention': {131 'Mode': 'abc',132 'Years': 1133 }134 }}135 e = assert_raises(ClientError, client.put_object_lock_configuration, Bucket=bucket_name,136 ObjectLockConfiguration=conf)137 status, error_code = self.get_status_and_error_code(e.response)138 self.eq(status, 400)139 self.eq(error_code, 'MalformedXML')140 conf = {'ObjectLockEnabled': 'Enabled',141 'Rule': {142 'DefaultRetention': {143 'Mode': 'governance',144 'Years': 1145 }146 }}147 e = assert_raises(ClientError, client.put_object_lock_configuration, Bucket=bucket_name,148 ObjectLockConfiguration=conf)149 status, error_code = self.get_status_and_error_code(e.response)150 self.eq(status, 400)151 self.eq(error_code, 'MalformedXML')152 def test_object_lock_put_obj_lock_invalid_status(self, s3cfg_global_unique):153 """154 (operation='Test put object lock with invalid status')155 (assertion='fails')156 """157 client = get_client(s3cfg_global_unique)158 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)159 client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)160 conf = {'ObjectLockEnabled': 'Disabled',161 'Rule': {162 'DefaultRetention': {163 'Mode': 'GOVERNANCE',164 'Years': 1165 }166 }}167 e = assert_raises(ClientError, client.put_object_lock_configuration, Bucket=bucket_name,168 ObjectLockConfiguration=conf)169 status, error_code = self.get_status_and_error_code(e.response)170 self.eq(status, 400)171 self.eq(error_code, 'MalformedXML')172 def test_object_lock_suspend_versioning(self, s3cfg_global_unique):173 """174 (operation='Test suspend versioning when object lock enabled')175 (assertion='fails')176 """177 client = get_client(s3cfg_global_unique)178 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)179 client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)180 e = assert_raises(ClientError, client.put_bucket_versioning, Bucket=bucket_name,181 VersioningConfiguration={'Status': 'Suspended'})182 status, error_code = self.get_status_and_error_code(e.response)183 self.eq(status, 409)184 self.eq(error_code, 'InvalidBucketState')185 def test_object_lock_get_obj_lock(self, s3cfg_global_unique):186 """187 (operation='Test get object lock')188 (assertion='success')189 """190 client = get_client(s3cfg_global_unique)191 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)192 client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)193 conf = {'ObjectLockEnabled': 'Enabled',194 'Rule': {195 'DefaultRetention': {196 'Mode': 'GOVERNANCE',197 'Days': 1198 }199 }}200 client.put_object_lock_configuration(201 Bucket=bucket_name,202 ObjectLockConfiguration=conf)203 response = client.get_object_lock_configuration(Bucket=bucket_name)204 self.eq(response['ObjectLockConfiguration'], conf)205 def test_object_lock_get_obj_lock_invalid_bucket(self, s3cfg_global_unique):206 """207 (operation='Test get object lock with bucket object lock not enabled')208 (assertion='fails')209 """210 client = get_client(s3cfg_global_unique)211 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)212 client.create_bucket(Bucket=bucket_name)213 e = assert_raises(ClientError, client.get_object_lock_configuration, Bucket=bucket_name)214 status, error_code = self.get_status_and_error_code(e.response)215 self.eq(status, 404)216 self.eq(error_code, 'ObjectLockConfigurationNotFoundError')217 def test_object_lock_put_obj_retention(self, s3cfg_global_unique):218 """219 (operation='Test put object retention')220 (assertion='success')221 """222 client = get_client(s3cfg_global_unique)223 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)224 client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)225 key = 'file1'226 response = client.put_object(Bucket=bucket_name, Body='abc', Key=key)227 version_id = response['VersionId']228 retention = {'Mode': 'GOVERNANCE', 'RetainUntilDate': datetime.datetime(2030, 1, 1, tzinfo=pytz.UTC)}229 response = client.put_object_retention(Bucket=bucket_name, Key=key, Retention=retention)230 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)231 client.delete_object(Bucket=bucket_name, Key=key, VersionId=version_id, BypassGovernanceRetention=True)232 def test_object_lock_put_obj_retention_invalid_bucket(self, s3cfg_global_unique):233 """234 (operation='Test put object retention with bucket object lock not enabled')235 (assertion='fails')236 """237 client = get_client(s3cfg_global_unique)238 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)239 client.create_bucket(Bucket=bucket_name)240 key = 'file1'241 client.put_object(Bucket=bucket_name, Body='abc', Key=key)242 retention = {'Mode': 'GOVERNANCE', 'RetainUntilDate': datetime.datetime(2030, 1, 1, tzinfo=pytz.UTC)}243 e = assert_raises(ClientError, client.put_object_retention, Bucket=bucket_name, Key=key, Retention=retention)244 status, error_code = self.get_status_and_error_code(e.response)245 self.eq(status, 400)246 self.eq(error_code, 'InvalidRequest')247 def test_object_lock_put_obj_retention_invalid_mode(self, s3cfg_global_unique):248 """249 (operation='Test put object retention with invalid mode')250 (assertion='fails')251 """252 client = get_client(s3cfg_global_unique)253 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)254 client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)255 key = 'file1'256 client.put_object(Bucket=bucket_name, Body='abc', Key=key)257 retention = {'Mode': 'governance', 'RetainUntilDate': datetime.datetime(2030, 1, 1, tzinfo=pytz.UTC)}258 e = assert_raises(ClientError, client.put_object_retention, Bucket=bucket_name, Key=key, Retention=retention)259 status, error_code = self.get_status_and_error_code(e.response)260 self.eq(status, 400)261 self.eq(error_code, 'MalformedXML')262 retention = {'Mode': 'abc', 'RetainUntilDate': datetime.datetime(2030, 1, 1, tzinfo=pytz.UTC)}263 e = assert_raises(ClientError, client.put_object_retention, Bucket=bucket_name, Key=key, Retention=retention)264 status, error_code = self.get_status_and_error_code(e.response)265 self.eq(status, 400)266 self.eq(error_code, 'MalformedXML')267 def test_object_lock_get_obj_retention(self, s3cfg_global_unique):268 """269 (operation='Test get object retention')270 (assertion='success')271 """272 client = get_client(s3cfg_global_unique)273 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)274 client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)275 key = 'file1'276 response = client.put_object(Bucket=bucket_name, Body='abc', Key=key)277 version_id = response['VersionId']278 retention = {'Mode': 'GOVERNANCE', 'RetainUntilDate': datetime.datetime(2030, 1, 1, tzinfo=pytz.UTC)}279 client.put_object_retention(Bucket=bucket_name, Key=key, Retention=retention)280 response = client.get_object_retention(Bucket=bucket_name, Key=key)281 self.eq(response['Retention'], retention)282 client.delete_object(Bucket=bucket_name, Key=key, VersionId=version_id, BypassGovernanceRetention=True)283 def test_object_lock_get_obj_retention_iso8601(self, s3cfg_global_unique):284 """285 (operation='Test object retention date formatting')286 (assertion='success')287 """288 client = get_client(s3cfg_global_unique)289 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)290 client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)291 key = 'file1'292 response = client.put_object(Bucket=bucket_name, Body='abc', Key=key)293 version_id = response['VersionId']294 date = datetime.datetime.today() + datetime.timedelta(days=365)295 retention = {'Mode': 'GOVERNANCE', 'RetainUntilDate': date}296 http_response = None297 def get_http_response(**kwargs):298 nonlocal http_response299 http_response = kwargs['http_response'].__dict__300 client.put_object_retention(Bucket=bucket_name, Key=key, Retention=retention)301 client.meta.events.register('after-call.s3.HeadObject', get_http_response)302 client.head_object(Bucket=bucket_name, VersionId=version_id, Key=key)303 retain_date = http_response['headers']['x-amz-object-lock-retain-until-date']304 isodate.parse_datetime(retain_date)305 client.delete_object(Bucket=bucket_name, Key=key, VersionId=version_id, BypassGovernanceRetention=True)306 def test_object_lock_get_obj_retention_invalid_bucket(self, s3cfg_global_unique):307 """308 (operation='Test get object retention with invalid bucket')309 (assertion='fails')310 """311 client = get_client(s3cfg_global_unique)312 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)313 client.create_bucket(Bucket=bucket_name)314 key = 'file1'315 client.put_object(Bucket=bucket_name, Body='abc', Key=key)316 e = assert_raises(ClientError, client.get_object_retention, Bucket=bucket_name, Key=key)317 status, error_code = self.get_status_and_error_code(e.response)318 self.eq(status, 400)319 self.eq(error_code, 'InvalidRequest')320 def test_object_lock_put_obj_retention_version_id(self, s3cfg_global_unique):321 """322 (operation='Test put object retention with version id')323 (assertion='success')324 """325 client = get_client(s3cfg_global_unique)326 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)327 client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)328 key = 'file1'329 client.put_object(Bucket=bucket_name, Body='abc', Key=key)330 response = client.put_object(Bucket=bucket_name, Body='abc', Key=key)331 version_id = response['VersionId']332 retention = {'Mode': 'GOVERNANCE', 'RetainUntilDate': datetime.datetime(2030, 1, 1, tzinfo=pytz.UTC)}333 client.put_object_retention(Bucket=bucket_name, Key=key, VersionId=version_id, Retention=retention)334 response = client.get_object_retention(Bucket=bucket_name, Key=key, VersionId=version_id)335 self.eq(response['Retention'], retention)336 client.delete_object(Bucket=bucket_name, Key=key, VersionId=version_id, BypassGovernanceRetention=True)337 def test_object_lock_put_obj_retention_override_default_retention(self, s3cfg_global_unique):338 """339 (operation='Test put object retention to override default retention')340 (assertion='success')341 """342 client = get_client(s3cfg_global_unique)343 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)344 client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)345 conf = {'ObjectLockEnabled': 'Enabled',346 'Rule': {347 'DefaultRetention': {348 'Mode': 'GOVERNANCE',349 'Days': 1350 }351 }}352 client.put_object_lock_configuration(353 Bucket=bucket_name,354 ObjectLockConfiguration=conf)355 key = 'file1'356 response = client.put_object(Bucket=bucket_name, Body='abc', Key=key)357 version_id = response['VersionId']358 retention = {'Mode': 'GOVERNANCE', 'RetainUntilDate': datetime.datetime(2030, 1, 1, tzinfo=pytz.UTC)}359 client.put_object_retention(Bucket=bucket_name, Key=key, Retention=retention)360 response = client.get_object_retention(Bucket=bucket_name, Key=key)361 self.eq(response['Retention'], retention)362 client.delete_object(Bucket=bucket_name, Key=key, VersionId=version_id, BypassGovernanceRetention=True)363 def test_object_lock_put_obj_retention_increase_period(self, s3cfg_global_unique):364 """365 (operation='Test put object retention to increase retention period')366 (assertion='success')...

Full Screen

Full Screen

enable_worm.py

Source:enable_worm.py Github

copy

Full Screen

...30 }31 }32 }33print "Configuring WORM %s on bucket : %s"%(worm_config, bucket)34s3c.put_object_lock_configuration(Bucket=bucket, ObjectLockConfiguration=worm_config)35#Get WORM configured on Bucket36print "WORM policies on bucket : %s"%(bucket)37print json.dumps(s3c.get_object_lock_configuration(Bucket=bucket), indent=4)38#Upload Object to bucket39res = s3c.put_object(Bucket=bucket, Key=objname)40versionid = res["VersionId"]41#Delete Object Version42print "Deleting object %s (version : %s)"%(objname, versionid)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful