Best Python code snippet using localstack_python
docker_in_worker.py
Source:docker_in_worker.py  
...190        token = ''191        if 'access_token' in connection.extra_dejson.keys():192            token = connection.extra_dejson['access_token']193        print(f"Registering data to {server}")194        template = get_record_template()195        196        r = create_draft_record(server=server, token=token, record=template)197        print(f"record {r}")198        if 'id' in r:199            print(f"Draft record created {r['id']} --> {r['links']['self']}")200        else:201            print('Something went wrong with registration', r, r.text)202            return -1203        204        for f in output_files:205            print(f"Uploading {f}")206            _ = add_file(record=r, fname=f, token=token, remote=f)207            # delete local208            # os.unlink(local)...lambda_timestream_load_from_dump_to_s3_json.py
Source:lambda_timestream_load_from_dump_to_s3_json.py  
...73        return obj74    except Exception as e:75        msg = "Error when fetching the S3 object: {}".format(e)76        raise RuntimeError(msg)77def get_record_template():78    return {79        'Dimensions': [],80        'MeasureName': '',81        'MeasureValue': '',82        'MeasureValueType': '',83        'Time': '',84        'TimeUnit': 'MILLISECONDS'85    }86def get_dim_template():87    return {88        'Name': '',89        'Value': '',90        'DimensionValueType': ''91    }92def get_timestamp(date):93    return int(1000 * parser.parse(date + 'Z').timestamp())94def write_to_timestream(records):95    try:96        response = tsw.write_records(DatabaseName=DB,97                                     TableName=TB,98                                     Records=records)99        log_me("Response from Timestream write: {}".format(response))100    except tsw.exceptions.RejectedRecordsException as e:101        print("Timestream returned a RejectedRecordsException")102        print(e.response)103    except Exception as e:104        msg = "Exception when writing to timestream: {}".format(e)105        raise RuntimeError(msg)106def lambda_handler(event, context):107    log_me("starting")108    check_table_exists(DB, TB)109    for record in event.get('Records', [{}]):110        try:111            bucket = record['messageAttributes']['bucket']['stringValue']112            key = record['messageAttributes']['key']['stringValue']113            if bucket is None or key is None:114                raise RuntimeError("Bucket or Key missing in SQS messageAttributes: "115                                   "bucket='{}', key='{}'".format(bucket, key))116            log_me("Fetching object '{}/{}'".format(bucket, key))117            obj = get_s3_object(bucket, key)118            print("Document '{}/{}' successfully read".format(bucket, key))119            records = []120            print('Found {} rows to ingest'.format(len(obj["Rows"])))121            rows_count = 0122            for row in obj["Rows"]:123                record = get_record_template()124                for idx, data in enumerate(row["Data"]):125                    k, v = list(data.items())[0]126                    name = obj["ColumnInfo"][idx]["Name"]127                    tpe = obj["ColumnInfo"][idx]["Type"]["ScalarType"]128                    if k != "ScalarValue":129                        # NullValues cannot be written to Timestream. Log other types when detected130                        if k != 'NullValue':131                            print("Ignoring unsupported Type: {}".format(k))132                        log_me("Skipping row: index {}, data {}".format(idx, data))133                        continue134                    if name == 'time':135                        # This is a timestamp136                        record["Time"] = str(get_timestamp(v))137                    elif name.startswith("measure_value"):...test_b2shareoperator.py
Source:test_b2shareoperator.py  
...77            get.return_value = m78            r = get_objects(server='foo')79            self.assertListEqual(['a', 'b'], r)80    def test_upload(self):81        template = get_record_template()82        server='https://b2share-testing.fz-juelich.de/'83        token = ''84        with patch('dags.b2shareoperator.requests.post') as post: 85            r = create_draft_record(server=server, token=token, record=template)86        r = dict()87        r['links']={'files':server, 'self': server}88        with patch('dags.b2shareoperator.requests.post') as put:89            a = tempfile.NamedTemporaryFile()90            a.write(b"some content")91            up = add_file(record=r, fname=a.name, token=token, remote='/tmp/somefile.txt')92        with patch('dags.b2shareoperator.requests.patch') as p:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
