How to use s3_client method in localstack

Best Python code snippet using localstack_python

s3_bucket.py

Source:s3_bucket.py Github

copy

Full Screen

...915 if endpoint_url is not None:916 return urlparse(endpoint_url).scheme in ('fakes3', 'fakes3s')917 else:918 return False919def get_s3_client(module, aws_connect_kwargs, location, ceph, endpoint_url):920 if ceph: # TODO - test this921 ceph = urlparse(endpoint_url)922 params = dict(module=module, conn_type='client', resource='s3', use_ssl=ceph.scheme == 'https',923 region=location, endpoint=endpoint_url, **aws_connect_kwargs)924 elif is_fakes3(endpoint_url):925 fakes3 = urlparse(endpoint_url)926 port = fakes3.port927 if fakes3.scheme == 'fakes3s':928 protocol = "https"929 if port is None:930 port = 443931 else:932 protocol = "http"933 if port is None:934 port = 80935 params = dict(module=module, conn_type='client', resource='s3', region=location,936 endpoint="%s://%s:%s" % (protocol, fakes3.hostname, to_text(port)),937 use_ssl=fakes3.scheme == 'fakes3s', **aws_connect_kwargs)938 else:939 params = dict(module=module, conn_type='client', resource='s3', region=location, endpoint=endpoint_url, **aws_connect_kwargs)940 return boto3_conn(**params)941def main():942 argument_spec = dict(943 force=dict(default=False, type='bool'),944 policy=dict(type='json'),945 name=dict(required=True),946 requester_pays=dict(type='bool'),947 state=dict(default='present', choices=['present', 'absent']),948 tags=dict(type='dict', aliases=['resource_tags']),949 purge_tags=dict(type='bool', default=True),950 versioning=dict(type='bool'),951 ceph=dict(default=False, type='bool', aliases=['rgw']),952 encryption=dict(choices=['none', 'AES256', 'aws:kms']),953 encryption_key_id=dict(),954 bucket_key_enabled=dict(type='bool'),955 public_access=dict(type='dict', options=dict(956 block_public_acls=dict(type='bool', default=False),957 ignore_public_acls=dict(type='bool', default=False),958 block_public_policy=dict(type='bool', default=False),959 restrict_public_buckets=dict(type='bool', default=False))),960 delete_public_access=dict(type='bool', default=False),961 object_ownership=dict(type='str', choices=['BucketOwnerEnforced', 'BucketOwnerPreferred', 'ObjectWriter']),962 delete_object_ownership=dict(type='bool', default=False),963 acl=dict(type='str', choices=['private', 'public-read', 'public-read-write', 'authenticated-read']),964 validate_bucket_name=dict(type='bool', default=True),965 )966 required_by = dict(967 encryption_key_id=('encryption',),968 )969 mutually_exclusive = [970 ['public_access', 'delete_public_access'],971 ['delete_object_ownership', 'object_ownership']972 ]973 required_if = [974 ['ceph', True, ['endpoint_url']],975 ]976 module = AnsibleAWSModule(977 argument_spec=argument_spec,978 required_by=required_by,979 required_if=required_if,980 mutually_exclusive=mutually_exclusive981 )982 region, _ec2_url, aws_connect_kwargs = get_aws_connection_info(module, boto3=True)983 if module.params.get('validate_bucket_name'):984 validate_bucket_name(module, module.params["name"])985 if region in ('us-east-1', '', None):986 # default to US Standard region987 location = 'us-east-1'988 else:989 # Boto uses symbolic names for locations but region strings will990 # actually work fine for everything except us-east-1 (US Standard)991 location = region992 endpoint_url = module.params.get('endpoint_url')993 ceph = module.params.get('ceph')994 # Look at endpoint_url and tweak connection settings995 # allow eucarc environment variables to be used if ansible vars aren't set996 if not endpoint_url and 'S3_URL' in os.environ:997 endpoint_url = os.environ['S3_URL']998 # if connecting to Ceph RGW, Walrus or fakes3999 if endpoint_url:1000 for key in ['validate_certs', 'security_token', 'profile_name']:1001 aws_connect_kwargs.pop(key, None)1002 s3_client = get_s3_client(module, aws_connect_kwargs, location, ceph, endpoint_url)1003 if s3_client is None: # this should never happen1004 module.fail_json(msg='Unknown error, failed to create s3 connection, no information available.')1005 state = module.params.get("state")1006 encryption = module.params.get("encryption")1007 encryption_key_id = module.params.get("encryption_key_id")1008 # Parameter validation1009 if encryption_key_id is not None and encryption != 'aws:kms':1010 module.fail_json(msg="Only 'aws:kms' is a valid option for encryption parameter when you specify encryption_key_id.")1011 if state == 'present':1012 create_or_update_bucket(s3_client, module, location)1013 elif state == 'absent':1014 destroy_bucket(s3_client, module)1015if __name__ == '__main__':1016 main()

Full Screen

Full Screen

s3-create.py

Source:s3-create.py Github

copy

Full Screen

...

Full Screen

Full Screen

s3_client.py

Source:s3_client.py Github

copy

Full Screen

1import boto32import os3import io4import json5import time6from google.protobuf import text_format7from tensorflow.python.training.checkpoint_state_pb2 import CheckpointState8class SageS3Client():9 def __init__(self, bucket=None, s3_prefix=None, aws_region=None):10 self.aws_region = aws_region11 self.bucket = bucket12 self.s3_prefix = s3_prefix13 self.config_key = os.path.normpath(s3_prefix + "/ip/ip.json")14 self.hyperparameters_key = os.path.normpath(s3_prefix + "/ip/hyperparameters.json")15 self.done_file_key = os.path.normpath(s3_prefix + "/ip/done")16 self.model_checkpoints_prefix = os.path.normpath(s3_prefix + "/model/") + "/"17 self.lock_file = ".lock"18 def get_client(self):19 session = boto3.session.Session()20 return session.client('s3', region_name=self.aws_region)21 def _get_s3_key(self, key):22 return os.path.normpath(self.model_checkpoints_prefix + "/" + key)23 def write_ip_config(self, ip):24 s3_client = self.get_client()25 data = {"IP": ip}26 json_blob = json.dumps(data)27 file_handle = io.BytesIO(json_blob.encode())28 file_handle_done = io.BytesIO(b'done')29 s3_client.upload_fileobj(file_handle, self.bucket, self.config_key)30 s3_client.upload_fileobj(file_handle_done, self.bucket, self.done_file_key)31 def upload_hyperparameters(self, hyperparams_json):32 s3_client = self.get_client()33 file_handle = io.BytesIO(hyperparams_json.encode())34 s3_client.upload_fileobj(file_handle, self.bucket, self.hyperparameters_key)35 def upload_model(self, checkpoint_dir):36 s3_client = self.get_client()37 num_files = 038 for root, dirs, files in os.walk("./" + checkpoint_dir):39 for filename in files:40 abs_name = os.path.abspath(os.path.join(root, filename))41 s3_client.upload_file(abs_name,42 self.bucket,43 "%s/%s/%s" % (self.s3_prefix, checkpoint_dir, filename))44 num_files += 145 print("Uploaded %s model files to S3" % num_files)46 def download_model(self, checkpoint_dir):47 s3_client = self.get_client()48 try:49 filename = os.path.abspath(os.path.join(checkpoint_dir, "checkpoint"))50 if not os.path.exists(checkpoint_dir):51 os.makedirs(checkpoint_dir)52 while True:53 response = s3_client.list_objects_v2(Bucket=self.bucket,54 Prefix=self._get_s3_key(self.lock_file))55 if "Contents" not in response:56 # If no lock is found, try getting the checkpoint57 try:58 s3_client.download_file(Bucket=self.bucket,59 Key=self._get_s3_key("checkpoint"),60 Filename=filename)61 except Exception as e:62 print("Checkpoint not found. Waiting...")63 time.sleep(2)64 continue65 else:66 time.sleep(2)67 continue68 ckpt = CheckpointState()69 if os.path.exists(filename):70 contents = open(filename, 'r').read()71 text_format.Merge(contents, ckpt)72 rel_path = ckpt.model_checkpoint_path73 checkpoint = int(rel_path.split('_Step')[0])74 response = s3_client.list_objects_v2(Bucket=self.bucket,75 Prefix=self._get_s3_key(rel_path))76 if "Contents" in response:77 num_files = 078 for obj in response["Contents"]:79 filename = os.path.abspath(os.path.join(checkpoint_dir,80 obj["Key"].replace(self.model_checkpoints_prefix,81 "")))82 s3_client.download_file(Bucket=self.bucket,83 Key=obj["Key"],84 Filename=filename)85 num_files += 186 print("Downloaded %s model files from S3" % num_files)87 return True88 except Exception as e:89 raise ValueError("Got exception: %s\n while downloading checkpoint from S3" % e)90 def get_ip(self):91 s3_client = self.get_client()92 self._wait_for_ip_upload()93 try:94 s3_client.download_file(self.bucket, self.config_key, 'ip.json')95 with open("ip.json") as f:96 ip = json.load(f)["IP"]97 return ip98 except Exception as e:99 raise RuntimeError("Cannot fetch IP of redis server running in SageMaker:", e)100 def _wait_for_ip_upload(self, timeout=600):101 s3_client = self.get_client()102 time_elapsed = 0103 while True:104 response = s3_client.list_objects(Bucket=self.bucket, Prefix=self.done_file_key)105 if "Contents" not in response:106 time.sleep(1)107 time_elapsed += 1108 if time_elapsed % 5 == 0:109 print("Waiting for SageMaker Redis server IP... Time elapsed: %s seconds" % time_elapsed)110 if time_elapsed >= timeout:111 raise RuntimeError("Cannot retrieve IP of redis server running in SageMaker")112 else:113 return114 def download_file(self, s3_key, local_path):115 s3_client = self.get_client()116 try:117 s3_client.download_file(self.bucket, s3_key, local_path)118 return True119 except Exception as e:120 return False121 def upload_file(self, s3_key, local_path):122 s3_client = self.get_client()123 try:124 s3_client.upload_file(Filename=local_path,125 Bucket=self.bucket,126 Key=s3_key)127 return True128 except Exception as e:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful