How to use get_or_create_bucket method in localstack

Best Python code snippet using localstack_python

helper.py

Source:helper.py Github

copy

Full Screen

...4from boto.s3.connection import Location5def build_connect_s3():6 s3 = boto.connect_s3(settings.AWS_ACCESS_KEY_ID, settings.AWS_SECRET_ACCESS_KEY, is_secure=False)7 return s38def get_or_create_bucket(bucket_name, policy='public-read', location=Location.DEFAULT):9 s3 = build_connect_s3()10 bucket = s3.lookup(bucket_name)11 if not bucket:12 try:13 bucket = s3.create_bucket(bucket_name, policy=policy, location=location)14 bucket.set_canned_acl('public-read')15 except s3.provider.storage_create_error:16 print 'Bucket (%s) is owned by another user' % bucket_name17 return bucket18def get_data_to_string(bucket_name, key_name):19 #get_contents_to_filename,get_file20 bucket = get_or_create_bucket(bucket_name)21 key = bucket.lookup(key_name)22 if not key:23 return None24 return key.get_contents_as_string()25def get_data_to_filename(bucket_name, key_name, filename):26 #get_contents_to_filename,get_file27 bucket = get_or_create_bucket(bucket_name)28 key = bucket.lookup(key_name)29 if not key:30 return None31 return key.get_contents_to_filename(filename)32def store_data_from_filename(bucket_name, key_name, path_source_file, metadata=None, headers=None, policy='public-read'):33 bucket = get_or_create_bucket(bucket_name, policy)34 key = bucket.new_key(key_name)35 key.set_contents_from_filename(path_source_file, headers=headers, policy=policy)36 if metadata:37 key.metadata.update(metadata)38 return key39def store_data_from_stream(bucket_name, key_name, stream, metadata=None, headers=None, policy='public-read'):40 bucket = get_or_create_bucket(bucket_name, policy)41 key = bucket.new_key(key_name)42 key.set_contents_from_stream(stream, headers=headers, policy=policy)43 if metadata:44 key.metadata.update(metadata)45 return key46def store_data_from_string(bucket_name, key_name, need_store_string, metadata=None, headers=None, policy='public-read'):47 bucket = get_or_create_bucket(bucket_name, policy)48 key = bucket.new_key(key_name)49 key.set_contents_from_string(need_store_string, headers=headers, policy=policy)50 if metadata:51 key.metadata.update(metadata)52 return key53def modify_metadata(bucket_name, key_name, metadata):54 bucket = get_or_create_bucket(bucket_name)55 key = bucket.lookup(key_name)56 if key:57 key.copy(bucket.name, key.name, metadata, preserve_acl=True)58 return key59def enable_logging(bucket_name, log_bucket_name, log_prefix=None):60 s3 = build_connect_s3()61 bucket = get_or_create_bucket(bucket_name)62 log_bucket = s3.lookup(log_bucket_name)63 log_bucket.set_as_logging_target()64 bucket.enable_logging(log_bucket, target_prefix=log_prefix)65 return None66def disable_logging(bucket_name):67 bucket = get_or_create_bucket(bucket_name)68 bucket.disable_logging()69 return None70def bucket_du(bucket_name):71 bucket = get_or_create_bucket(bucket_name)72 total_bytes = 073 if bucket:74 for key in bucket:75 total_bytes += key.size76 return total_bytes77def get_expire_data_url(bucket_name, key_name, expires_seconds):#该URL地址有过期时间78 bucket = get_or_create_bucket(bucket_name)79 key = bucket.lookup(key_name)80 if not key:81 return None82 return key.generate_url(expires_seconds)83def get_data_url(bucket_name, key_name):84 if settings.IS_PRODUCTION_SERVER:85 domain = 's3.itc.cn'86 else:87 domain = 's3.amazonaws.com'88 url = 'http://%s.%s/%s' % (bucket_name, domain, key_name)89 return url90def set_bucket_acl(bucket_name, policy):91 '''92 POLICY: 'private', 'public-read','public-read-write', 93 'authenticated-read', 'bucket-owner-read', 'bucket-owner-full-control'94 '''95 bucket = get_or_create_bucket(bucket_name, policy)96 bucket.set_acl(policy)97 98 return None99def get_bucket_acl(bucket_name):100 bucket = get_or_create_bucket(bucket_name)...

Full Screen

Full Screen

minio_utils.py

Source:minio_utils.py Github

copy

Full Screen

...3import socket4from minio import Minio5from settings import MINIO6mc = Minio(f"{MINIO['HOST']}:{MINIO['PORT']}", access_key=MINIO["ACCESS_KEY"], secret_key=MINIO["SECRET_KEY"], secure=False)7def get_or_create_bucket(bucket_name):8 """Create the bucket if not already present"""9 if not mc.bucket_exists(bucket_name):10 mc.make_bucket(bucket_name)11 return bucket_name12def backup_log_file(_file):13 """Backup the log file to minio bucket"""14 try:15 get_or_create_bucket(MINIO["BUCKET"]["FILE_STORE"])16 object_name = f"Server {socket.gethostname()} - {_file.split('/')[-1]}"17 mc.fput_object(MINIO["BUCKET"]["FILE_STORE"], object_name, _file, content_type="application/text")18 except Exception as e:19 print("Unable to backup log file to minio", e)20async def update_record_store(data):21 """Update the metadata bucket on minio with log data"""22 get_or_create_bucket(MINIO["BUCKET"]["RECORD_STORE"])23 object_name = f"{data['timestamp']}/{data['clientip']}"...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful