Best Python code snippet using localstack_python
s3_bucket.py
Source:s3_bucket.py  
...915    if endpoint_url is not None:916        return urlparse(endpoint_url).scheme in ('fakes3', 'fakes3s')917    else:918        return False919def get_s3_client(module, aws_connect_kwargs, location, ceph, endpoint_url):920    if ceph:  # TODO - test this921        ceph = urlparse(endpoint_url)922        params = dict(module=module, conn_type='client', resource='s3', use_ssl=ceph.scheme == 'https',923                      region=location, endpoint=endpoint_url, **aws_connect_kwargs)924    elif is_fakes3(endpoint_url):925        fakes3 = urlparse(endpoint_url)926        port = fakes3.port927        if fakes3.scheme == 'fakes3s':928            protocol = "https"929            if port is None:930                port = 443931        else:932            protocol = "http"933            if port is None:934                port = 80935        params = dict(module=module, conn_type='client', resource='s3', region=location,936                      endpoint="%s://%s:%s" % (protocol, fakes3.hostname, to_text(port)),937                      use_ssl=fakes3.scheme == 'fakes3s', **aws_connect_kwargs)938    else:939        params = dict(module=module, conn_type='client', resource='s3', region=location, endpoint=endpoint_url, **aws_connect_kwargs)940    return boto3_conn(**params)941def main():942    argument_spec = dict(943        force=dict(default=False, type='bool'),944        policy=dict(type='json'),945        name=dict(required=True),946        requester_pays=dict(type='bool'),947        state=dict(default='present', choices=['present', 'absent']),948        tags=dict(type='dict', aliases=['resource_tags']),949        purge_tags=dict(type='bool', default=True),950        versioning=dict(type='bool'),951        ceph=dict(default=False, type='bool', aliases=['rgw']),952        encryption=dict(choices=['none', 'AES256', 'aws:kms']),953        encryption_key_id=dict(),954        bucket_key_enabled=dict(type='bool'),955        public_access=dict(type='dict', options=dict(956            block_public_acls=dict(type='bool', default=False),957            ignore_public_acls=dict(type='bool', default=False),958            block_public_policy=dict(type='bool', default=False),959            restrict_public_buckets=dict(type='bool', default=False))),960        delete_public_access=dict(type='bool', default=False),961        object_ownership=dict(type='str', choices=['BucketOwnerEnforced', 'BucketOwnerPreferred', 'ObjectWriter']),962        delete_object_ownership=dict(type='bool', default=False),963        acl=dict(type='str', choices=['private', 'public-read', 'public-read-write', 'authenticated-read']),964        validate_bucket_name=dict(type='bool', default=True),965    )966    required_by = dict(967        encryption_key_id=('encryption',),968    )969    mutually_exclusive = [970        ['public_access', 'delete_public_access'],971        ['delete_object_ownership', 'object_ownership']972    ]973    required_if = [974        ['ceph', True, ['endpoint_url']],975    ]976    module = AnsibleAWSModule(977        argument_spec=argument_spec,978        required_by=required_by,979        required_if=required_if,980        mutually_exclusive=mutually_exclusive981    )982    region, _ec2_url, aws_connect_kwargs = get_aws_connection_info(module, boto3=True)983    if module.params.get('validate_bucket_name'):984        validate_bucket_name(module, module.params["name"])985    if region in ('us-east-1', '', None):986        # default to US Standard region987        location = 'us-east-1'988    else:989        # Boto uses symbolic names for locations but region strings will990        # actually work fine for everything except us-east-1 (US Standard)991        location = region992    endpoint_url = module.params.get('endpoint_url')993    ceph = module.params.get('ceph')994    # Look at endpoint_url and tweak connection settings995    # allow eucarc environment variables to be used if ansible vars aren't set996    if not endpoint_url and 'S3_URL' in os.environ:997        endpoint_url = os.environ['S3_URL']998    # if connecting to Ceph RGW, Walrus or fakes3999    if endpoint_url:1000        for key in ['validate_certs', 'security_token', 'profile_name']:1001            aws_connect_kwargs.pop(key, None)1002    s3_client = get_s3_client(module, aws_connect_kwargs, location, ceph, endpoint_url)1003    if s3_client is None:  # this should never happen1004        module.fail_json(msg='Unknown error, failed to create s3 connection, no information available.')1005    state = module.params.get("state")1006    encryption = module.params.get("encryption")1007    encryption_key_id = module.params.get("encryption_key_id")1008    # Parameter validation1009    if encryption_key_id is not None and encryption != 'aws:kms':1010        module.fail_json(msg="Only 'aws:kms' is a valid option for encryption parameter when you specify encryption_key_id.")1011    if state == 'present':1012        create_or_update_bucket(s3_client, module, location)1013    elif state == 'absent':1014        destroy_bucket(s3_client, module)1015if __name__ == '__main__':1016    main()s3-create.py
Source:s3-create.py  
...s3_client.py
Source:s3_client.py  
1import boto32import os3import io4import json5import time6from google.protobuf import text_format7from tensorflow.python.training.checkpoint_state_pb2 import CheckpointState8class SageS3Client():9    def __init__(self, bucket=None, s3_prefix=None, aws_region=None):10        self.aws_region = aws_region11        self.bucket = bucket12        self.s3_prefix = s3_prefix13        self.config_key = os.path.normpath(s3_prefix + "/ip/ip.json")14        self.hyperparameters_key = os.path.normpath(s3_prefix + "/ip/hyperparameters.json")15        self.done_file_key = os.path.normpath(s3_prefix + "/ip/done")16        self.model_checkpoints_prefix = os.path.normpath(s3_prefix + "/model/") + "/"17        self.lock_file = ".lock"18    def get_client(self):19        session = boto3.session.Session()20        return session.client('s3', region_name=self.aws_region)21    def _get_s3_key(self, key):22        return os.path.normpath(self.model_checkpoints_prefix + "/" + key)23    def write_ip_config(self, ip):24        s3_client = self.get_client()25        data = {"IP": ip}26        json_blob = json.dumps(data)27        file_handle = io.BytesIO(json_blob.encode())28        file_handle_done = io.BytesIO(b'done')29        s3_client.upload_fileobj(file_handle, self.bucket, self.config_key)30        s3_client.upload_fileobj(file_handle_done, self.bucket, self.done_file_key)31    def upload_hyperparameters(self, hyperparams_json):32        s3_client = self.get_client()33        file_handle = io.BytesIO(hyperparams_json.encode())34        s3_client.upload_fileobj(file_handle, self.bucket, self.hyperparameters_key)35    def upload_model(self, checkpoint_dir):36        s3_client = self.get_client()37        num_files = 038        for root, dirs, files in os.walk("./" + checkpoint_dir):39            for filename in files:40                abs_name = os.path.abspath(os.path.join(root, filename))41                s3_client.upload_file(abs_name,42                                      self.bucket,43                                      "%s/%s/%s" % (self.s3_prefix, checkpoint_dir, filename))44                num_files += 145        print("Uploaded %s model files to S3" % num_files)46    def download_model(self, checkpoint_dir):47        s3_client = self.get_client()48        try:49            filename = os.path.abspath(os.path.join(checkpoint_dir, "checkpoint"))50            if not os.path.exists(checkpoint_dir):51                os.makedirs(checkpoint_dir)52            while True:53                response = s3_client.list_objects_v2(Bucket=self.bucket,54                                                     Prefix=self._get_s3_key(self.lock_file))55                if "Contents" not in response:56                    # If no lock is found, try getting the checkpoint57                    try:58                        s3_client.download_file(Bucket=self.bucket,59                                                Key=self._get_s3_key("checkpoint"),60                                                Filename=filename)61                    except Exception as e:62                        print("Checkpoint not found. Waiting...")63                        time.sleep(2)64                        continue65                else:66                    time.sleep(2)67                    continue68                ckpt = CheckpointState()69                if os.path.exists(filename):70                    contents = open(filename, 'r').read()71                    text_format.Merge(contents, ckpt)72                    rel_path = ckpt.model_checkpoint_path73                    checkpoint = int(rel_path.split('_Step')[0])74                    response = s3_client.list_objects_v2(Bucket=self.bucket,75                                                         Prefix=self._get_s3_key(rel_path))76                    if "Contents" in response:77                        num_files = 078                        for obj in response["Contents"]:79                            filename = os.path.abspath(os.path.join(checkpoint_dir,80                                                                    obj["Key"].replace(self.model_checkpoints_prefix,81                                                                                       "")))82                            s3_client.download_file(Bucket=self.bucket,83                                                    Key=obj["Key"],84                                                    Filename=filename)85                            num_files += 186                        print("Downloaded %s model files from S3" % num_files)87                        return True88        except Exception as e:89            raise ValueError("Got exception: %s\n while downloading checkpoint from S3" % e)90    def get_ip(self):91        s3_client = self.get_client()92        self._wait_for_ip_upload()93        try:94            s3_client.download_file(self.bucket, self.config_key, 'ip.json')95            with open("ip.json") as f:96                ip = json.load(f)["IP"]97            return ip98        except Exception as e:99            raise RuntimeError("Cannot fetch IP of redis server running in SageMaker:", e)100    def _wait_for_ip_upload(self, timeout=600):101        s3_client = self.get_client()102        time_elapsed = 0103        while True:104            response = s3_client.list_objects(Bucket=self.bucket, Prefix=self.done_file_key)105            if "Contents" not in response:106                time.sleep(1)107                time_elapsed += 1108                if time_elapsed % 5 == 0:109                    print("Waiting for SageMaker Redis server IP... Time elapsed: %s seconds" % time_elapsed)110                if time_elapsed >= timeout:111                    raise RuntimeError("Cannot retrieve IP of redis server running in SageMaker")112            else:113                return114    def download_file(self, s3_key, local_path):115        s3_client = self.get_client()116        try:117            s3_client.download_file(self.bucket, s3_key, local_path)118            return True119        except Exception as e:120            return False121    def upload_file(self, s3_key, local_path):122        s3_client = self.get_client()123        try:124            s3_client.upload_file(Filename=local_path,125                                  Bucket=self.bucket,126                                  Key=s3_key)127            return True128        except Exception as e:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
