Best Python code snippet using localstack_python
dynamodbstreams_api.py
Source:dynamodbstreams_api.py  
...44            payload={"n": event_publisher.get_hash(table_name)},45        )46def get_stream_for_table(table_arn):47    region = DynamoDBStreamsBackend.get()48    table_name = table_name_from_stream_arn(table_arn)49    return region.ddb_streams.get(table_name)50def forward_events(records):51    kinesis = aws_stack.connect_to_service("kinesis")52    for record in records:53        if "SequenceNumber" not in record["dynamodb"]:54            record["dynamodb"]["SequenceNumber"] = str(55                DynamoDBStreamsBackend.SEQUENCE_NUMBER_COUNTER56            )57            DynamoDBStreamsBackend.SEQUENCE_NUMBER_COUNTER += 158        table_arn = record["eventSourceARN"]59        stream = get_stream_for_table(table_arn)60        if stream:61            table_name = table_name_from_stream_arn(stream["StreamArn"])62            stream_name = get_kinesis_stream_name(table_name)63            kinesis.put_record(StreamName=stream_name, Data=json.dumps(record), PartitionKey="TODO")64def delete_streams(table_arn):65    region = DynamoDBStreamsBackend.get()66    table_name = table_name_from_table_arn(table_arn)67    stream = region.ddb_streams.pop(table_name, None)68    if stream:69        stream_name = get_kinesis_stream_name(table_name)70        try:71            aws_stack.connect_to_service("kinesis").delete_stream(StreamName=stream_name)72            # sleep a bit, as stream deletion can take some time ...73            time.sleep(1)74        except Exception:75            pass  # ignore "stream not found" errors76@app.route("/", methods=["POST"])77def post_request():78    region = DynamoDBStreamsBackend.get()79    action = request.headers.get("x-amz-target", "")80    action = action.split(".")[-1]81    data = json.loads(to_str(request.data))82    result = {}83    kinesis = aws_stack.connect_to_service("kinesis")84    if action == "ListStreams":85        result = {"Streams": list(region.ddb_streams.values())}86    elif action == "DescribeStream":87        for stream in region.ddb_streams.values():88            if stream["StreamArn"] == data["StreamArn"]:89                result = {"StreamDescription": stream}90                # get stream details91                dynamodb = aws_stack.connect_to_service("dynamodb")92                table_name = table_name_from_stream_arn(stream["StreamArn"])93                stream_name = get_kinesis_stream_name(table_name)94                stream_details = kinesis.describe_stream(StreamName=stream_name)95                table_details = dynamodb.describe_table(TableName=table_name)96                stream["KeySchema"] = table_details["Table"]["KeySchema"]97                # Replace Kinesis ShardIDs with ones that mimic actual98                # DynamoDBStream ShardIDs.99                stream_shards = stream_details["StreamDescription"]["Shards"]100                for shard in stream_shards:101                    shard["ShardId"] = shard_id(stream_name, shard["ShardId"])102                stream["Shards"] = stream_shards103                break104        if not result:105            return error_response(106                "Requested resource not found", error_type="ResourceNotFoundException"107            )108    elif action == "GetShardIterator":109        # forward request to Kinesis API110        stream_name = stream_name_from_stream_arn(data["StreamArn"])111        stream_shard_id = kinesis_shard_id(data["ShardId"])112        kwargs = (113            {"StartingSequenceNumber": data["SequenceNumber"]} if data.get("SequenceNumber") else {}114        )115        result = kinesis.get_shard_iterator(116            StreamName=stream_name,117            ShardId=stream_shard_id,118            ShardIteratorType=data["ShardIteratorType"],119            **kwargs,120        )121    elif action == "GetRecords":122        kinesis_records = kinesis.get_records(**data)123        result = {124            "Records": [],125            "NextShardIterator": kinesis_records.get("NextShardIterator"),126        }127        for record in kinesis_records["Records"]:128            record_data = json.loads(to_str(record["Data"]))129            record_data["dynamodb"]["SequenceNumber"] = record["SequenceNumber"]130            result["Records"].append(record_data)131    else:132        print('WARNING: Unknown operation "%s"' % action)133    return jsonify(result)134# -----------------135# HELPER FUNCTIONS136# -----------------137def error_response(message=None, error_type=None, code=400):138    if not message:139        message = "Unknown error"140    if not error_type:141        error_type = "UnknownError"142    if "com.amazonaws.dynamodb" not in error_type:143        error_type = "com.amazonaws.dynamodb.v20120810#%s" % error_type144    content = {"message": message, "__type": error_type}145    return make_response(jsonify(content), code)146def get_kinesis_stream_name(table_name):147    return DDB_KINESIS_STREAM_NAME_PREFIX + table_name148def table_name_from_stream_arn(stream_arn):149    return stream_arn.split(":table/", 1)[-1].split("/")[0]150def table_name_from_table_arn(table_arn):151    return table_name_from_stream_arn(table_arn)152def stream_name_from_stream_arn(stream_arn):153    table_name = table_name_from_stream_arn(stream_arn)154    return get_kinesis_stream_name(table_name)155def shard_id(stream_arn, kinesis_shard_id):156    timestamp = str(int(now_utc()))157    timestamp = "%s00000000" % timestamp[:-5]158    timestamp = "%s%s" % ("0" * (20 - len(timestamp)), timestamp)159    suffix = kinesis_shard_id.replace("shardId-", "")[:32]160    return "shardId-%s-%s" % (timestamp, suffix)161def kinesis_shard_id(dynamodbstream_shard_id):162    shard_params = dynamodbstream_shard_id.rsplit("-")163    return "{0}-{1}".format(shard_params[0], shard_params[-1])164def serve(port, quiet=True):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
