Best Python code snippet using localstack_python
dynamodb_listener.py
Source:dynamodb_listener.py  
...240        elif action == "EnableKinesisStreamingDestination":241            # Check if table exists, to avoid error log output from DynamoDBLocal242            if not self.table_exists(ddb_client, data["TableName"]):243                return get_table_not_found_error()244            stream = is_kinesis_stream_exists(stream_arn=data["StreamArn"])245            if not stream:246                return error_response(247                    error_type="ValidationException",248                    message="User does not have a permission to use kinesis stream",249                )250            return dynamodb_enable_kinesis_streaming_destination(data, table_def)251        elif action == "DisableKinesisStreamingDestination":252            # Check if table exists, to avoid error log output from DynamoDBLocal253            if not self.table_exists(ddb_client, data["TableName"]):254                return get_table_not_found_error()255            stream = is_kinesis_stream_exists(stream_arn=data["StreamArn"])256            if not stream:257                return error_response(258                    error_type="ValidationException",259                    message="User does not have a permission to use kinesis stream",260                )261            return dynamodb_disable_kinesis_streaming_destination(data, table_def)262        elif action == "DescribeKinesisStreamingDestination":263            # Check if table exists, to avoid error log output from DynamoDBLocal264            if not self.table_exists(ddb_client, data["TableName"]):265                return get_table_not_found_error()266            response = aws_responses.requests_response(267                {268                    "KinesisDataStreamDestinations": table_def.get("KinesisDataStreamDestinations")269                    or [],270                    "TableName": data["TableName"],271                }272            )273            return response274        return Request(data=data_orig, method=method, headers=headers)275    def return_response(self, method, path, data, headers, response):276        if path.startswith("/shell") or method == "GET":277            return278        data = json.loads(to_str(data))279        # update table definitions280        if data and "TableName" in data and "KeySchema" in data:281            table_definitions = DynamoDBRegion.get().table_definitions282            table_definitions[data["TableName"]] = data283        if response._content:284            # fix the table and latest stream ARNs (DynamoDBLocal hardcodes "ddblocal" as the region)285            content_replaced = re.sub(286                r'("TableArn"|"LatestStreamArn"|"StreamArn")\s*:\s*"arn:aws:dynamodb:ddblocal:([^"]+)"',287                r'\1: "arn:aws:dynamodb:%s:\2"' % aws_stack.get_region(),288                to_str(response._content),289            )290            if content_replaced != response._content:291                response._content = content_replaced292                fix_headers_for_updated_response(response)293        action = headers.get("X-Amz-Target", "")294        action = action.replace(ACTION_PREFIX, "")295        if not action:296            return297        # upgrade event version to 1.1298        record = {299            "eventID": "1",300            "eventVersion": "1.1",301            "dynamodb": {302                "ApproximateCreationDateTime": time.time(),303                # 'StreamViewType': 'NEW_AND_OLD_IMAGES',304                "SizeBytes": -1,305            },306            "awsRegion": aws_stack.get_region(),307            "eventSource": "aws:dynamodb",308        }309        records = [record]310        streams_enabled_cache = {}311        table_name = data.get("TableName")312        event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(313            table_name, streams_enabled_cache314        )315        if action == "UpdateItem":316            if response.status_code == 200 and event_sources_or_streams_enabled:317                existing_item = self._thread_local("existing_item")318                record["eventName"] = "INSERT" if not existing_item else "MODIFY"319                record["eventID"] = short_uid()320                updated_item = find_existing_item(data)321                if not updated_item:322                    return323                record["dynamodb"]["Keys"] = data["Key"]324                if existing_item:325                    record["dynamodb"]["OldImage"] = existing_item326                record["dynamodb"]["NewImage"] = updated_item327                record["dynamodb"]["SizeBytes"] = len(json.dumps(updated_item))328                stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)329                if stream_spec:330                    record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]331        elif action == "BatchWriteItem":332            records, unprocessed_items = self.prepare_batch_write_item_records(record, data)333            for record in records:334                event_sources_or_streams_enabled = (335                    event_sources_or_streams_enabled336                    or has_event_sources_or_streams_enabled(337                        record["eventSourceARN"], streams_enabled_cache338                    )339                )340            if response.status_code == 200 and any(unprocessed_items):341                content = json.loads(to_str(response.content))342                table_name = list(data["RequestItems"].keys())[0]343                if table_name not in content["UnprocessedItems"]:344                    content["UnprocessedItems"][table_name] = []345                for key in ["PutRequest", "DeleteRequest"]:346                    if any(unprocessed_items[key]):347                        content["UnprocessedItems"][table_name].append(348                            {key: unprocessed_items[key]}349                        )350                unprocessed = content["UnprocessedItems"]351                for key in list(unprocessed.keys()):352                    if not unprocessed.get(key):353                        del unprocessed[key]354                response._content = json.dumps(content)355                fix_headers_for_updated_response(response)356        elif action == "TransactWriteItems":357            records = self.prepare_transact_write_item_records(record, data)358            for record in records:359                event_sources_or_streams_enabled = (360                    event_sources_or_streams_enabled361                    or has_event_sources_or_streams_enabled(362                        record["eventSourceARN"], streams_enabled_cache363                    )364                )365        elif action == "PutItem":366            if response.status_code == 200:367                keys = dynamodb_extract_keys(item=data["Item"], table_name=table_name)368                if isinstance(keys, Response):369                    return keys370                # fix response371                if response._content == "{}":372                    response._content = update_put_item_response_content(data, response._content)373                    fix_headers_for_updated_response(response)374                if event_sources_or_streams_enabled:375                    existing_item = self._thread_local("existing_item")376                    # Get stream specifications details for the table377                    stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)378                    record["eventName"] = "INSERT" if not existing_item else "MODIFY"379                    # prepare record keys380                    record["dynamodb"]["Keys"] = keys381                    record["dynamodb"]["NewImage"] = data["Item"]382                    record["dynamodb"]["SizeBytes"] = len(json.dumps(data["Item"]))383                    record["eventID"] = short_uid()384                    if stream_spec:385                        record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]386                    if existing_item:387                        record["dynamodb"]["OldImage"] = existing_item388        elif action in ("GetItem", "Query"):389            if response.status_code == 200:390                content = json.loads(to_str(response.content))391                # make sure we append 'ConsumedCapacity', which is properly392                # returned by dynalite, but not by AWS's DynamoDBLocal393                if "ConsumedCapacity" not in content and data.get("ReturnConsumedCapacity") in [394                    "TOTAL",395                    "INDEXES",396                ]:397                    content["ConsumedCapacity"] = {398                        "TableName": table_name,399                        "CapacityUnits": 5,  # TODO hardcoded400                        "ReadCapacityUnits": 2,401                        "WriteCapacityUnits": 3,402                    }403                    response._content = json.dumps(content)404                    fix_headers_for_updated_response(response)405        elif action == "DeleteItem":406            if response.status_code == 200 and event_sources_or_streams_enabled:407                old_item = self._thread_local("existing_item")408                record["eventID"] = short_uid()409                record["eventName"] = "REMOVE"410                record["dynamodb"]["Keys"] = data["Key"]411                record["dynamodb"]["OldImage"] = old_item412                record["dynamodb"]["SizeBytes"] = len(json.dumps(old_item))413                # Get stream specifications details for the table414                stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)415                if stream_spec:416                    record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]417        elif action == "CreateTable":418            if response.status_code == 200:419                table_definitions = (420                    DynamoDBRegion.get().table_definitions.get(data["TableName"]) or {}421                )422                if "TableId" not in table_definitions:423                    table_definitions["TableId"] = long_uid()424                if "SSESpecification" in table_definitions:425                    sse_specification = table_definitions.pop("SSESpecification")426                    table_definitions["SSEDescription"] = get_sse_description(sse_specification)427                content = json.loads(to_str(response.content))428                if table_definitions:429                    table_content = content.get("Table", {})430                    table_content.update(table_definitions)431                    content["TableDescription"].update(table_content)432                    update_response_content(response, content)433                if "StreamSpecification" in data:434                    create_dynamodb_stream(435                        data, content["TableDescription"].get("LatestStreamLabel")436                    )437                if data.get("Tags"):438                    table_arn = content["TableDescription"]["TableArn"]439                    DynamoDBRegion.TABLE_TAGS[table_arn] = {440                        tag["Key"]: tag["Value"] for tag in data["Tags"]441                    }442            event_publisher.fire_event(443                event_publisher.EVENT_DYNAMODB_CREATE_TABLE,444                payload={"n": event_publisher.get_hash(table_name)},445            )446            return447        elif action == "DeleteTable":448            if response.status_code == 200:449                table_arn = (450                    json.loads(response._content).get("TableDescription", {}).get("TableArn")451                )452                event_publisher.fire_event(453                    event_publisher.EVENT_DYNAMODB_DELETE_TABLE,454                    payload={"n": event_publisher.get_hash(table_name)},455                )456                self.delete_all_event_source_mappings(table_arn)457                dynamodbstreams_api.delete_streams(table_arn)458                DynamoDBRegion.TABLE_TAGS.pop(table_arn, None)459            return460        elif action == "UpdateTable":461            content_str = to_str(response._content or "")462            if response.status_code == 200 and "StreamSpecification" in data:463                content = json.loads(content_str)464                create_dynamodb_stream(data, content["TableDescription"].get("LatestStreamLabel"))465            if (466                response.status_code >= 400467                and data.get("ReplicaUpdates")468                and "Nothing to update" in content_str469            ):470                table_name = data.get("TableName")471                # update local table props (replicas)472                table_properties = DynamoDBRegion.get().table_properties473                table_properties[table_name] = table_props = table_properties.get(table_name) or {}474                table_props["Replicas"] = replicas = table_props.get("Replicas") or []475                for repl_update in data["ReplicaUpdates"]:476                    for key, details in repl_update.items():477                        region = details.get("RegionName")478                        if key == "Create":479                            details["ReplicaStatus"] = details.get("ReplicaStatus") or "ACTIVE"480                            replicas.append(details)481                        if key == "Update":482                            replica = [r for r in replicas if r.get("RegionName") == region]483                            if replica:484                                replica[0].update(details)485                        if key == "Delete":486                            table_props["Replicas"] = [487                                r for r in replicas if r.get("RegionName") != region488                            ]489                # update response content490                schema = get_table_schema(table_name)491                result = {"TableDescription": schema["Table"]}492                update_response_content(response, json_safe(result), 200)493            return494        elif action == "DescribeTable":495            table_name = data.get("TableName")496            table_props = DynamoDBRegion.get().table_properties.get(table_name)497            if table_props:498                content = json.loads(to_str(response.content))499                content.get("Table", {}).update(table_props)500                update_response_content(response, content)501            # Update only TableId and SSEDescription if present502            table_definitions = DynamoDBRegion.get().table_definitions.get(table_name)503            if table_definitions:504                for key in ["TableId", "SSEDescription"]:505                    if table_definitions.get(key):506                        content = json.loads(to_str(response.content))507                        content.get("Table", {})[key] = table_definitions[key]508                        update_response_content(response, content)509        elif action == "TagResource":510            table_arn = data["ResourceArn"]511            table_tags = DynamoDBRegion.TABLE_TAGS512            if table_arn not in table_tags:513                table_tags[table_arn] = {}514            table_tags[table_arn].update({tag["Key"]: tag["Value"] for tag in data.get("Tags", [])})515            return516        elif action == "UntagResource":517            table_arn = data["ResourceArn"]518            for tag_key in data.get("TagKeys", []):519                DynamoDBRegion.TABLE_TAGS.get(table_arn, {}).pop(tag_key, None)520            return521        else:522            # nothing to do523            return524        if event_sources_or_streams_enabled and records and "eventName" in records[0]:525            if "TableName" in data:526                records[0]["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)527            # forward to kinesis stream528            records_to_kinesis = copy.deepcopy(records)529            forward_to_kinesis_stream(records_to_kinesis)530            # forward to lambda and ddb_streams531            forward_to_lambda(records)532            records = self.prepare_records_to_forward_to_ddb_stream(records)533            forward_to_ddb_stream(records)534    # -------------535    # UTIL METHODS536    # -------------537    def prepare_request_headers(self, headers):538        # Note: We need to ensure that the same access key is used here for all requests,539        # otherwise DynamoDBLocal stores tables/items in separate namespaces540        headers["Authorization"] = re.sub(541            r"Credential=[^/]+/",542            r"Credential=%s/" % constants.TEST_AWS_ACCESS_KEY_ID,543            headers.get("Authorization") or "",544        )545    def prepare_batch_write_item_records(self, record, data):546        records = []547        unprocessed_items = {"PutRequest": {}, "DeleteRequest": {}}548        i = 0549        for table_name in sorted(data["RequestItems"].keys()):550            # Add stream view type to record if ddb stream is enabled551            stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)552            if stream_spec:553                record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]554            for request in data["RequestItems"][table_name]:555                put_request = request.get("PutRequest")556                existing_items = self._thread_local("existing_items")557                if put_request:558                    if existing_items and len(existing_items) > i:559                        existing_item = existing_items[i]560                        keys = dynamodb_extract_keys(561                            item=put_request["Item"], table_name=table_name562                        )563                        if isinstance(keys, Response):564                            return keys565                        new_record = clone(record)566                        new_record["eventID"] = short_uid()567                        new_record["dynamodb"]["SizeBytes"] = len(json.dumps(put_request["Item"]))568                        new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"569                        new_record["dynamodb"]["Keys"] = keys570                        new_record["dynamodb"]["NewImage"] = put_request["Item"]571                        if existing_item:572                            new_record["dynamodb"]["OldImage"] = existing_item573                        new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)574                        records.append(new_record)575                    unprocessed_put_items = self._thread_local("unprocessed_put_items")576                    if unprocessed_put_items and len(unprocessed_put_items) > i:577                        unprocessed_item = unprocessed_put_items[i]578                        if unprocessed_item:579                            unprocessed_items["PutRequest"].update(580                                json.loads(json.dumps(unprocessed_item))581                            )582                delete_request = request.get("DeleteRequest")583                if delete_request:584                    if existing_items and len(existing_items) > i:585                        keys = delete_request["Key"]586                        if isinstance(keys, Response):587                            return keys588                        new_record = clone(record)589                        new_record["eventID"] = short_uid()590                        new_record["eventName"] = "REMOVE"591                        new_record["dynamodb"]["Keys"] = keys592                        new_record["dynamodb"]["OldImage"] = existing_items[i]593                        new_record["dynamodb"]["SizeBytes"] = len(json.dumps(existing_items[i]))594                        new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)595                        records.append(new_record)596                    unprocessed_delete_items = self._thread_local("unprocessed_delete_items")597                    if unprocessed_delete_items and len(unprocessed_delete_items) > i:598                        unprocessed_item = unprocessed_delete_items[i]599                        if unprocessed_item:600                            unprocessed_items["DeleteRequest"].update(601                                json.loads(json.dumps(unprocessed_item))602                            )603                i += 1604        return records, unprocessed_items605    def prepare_transact_write_item_records(self, record, data):606        records = []607        # Fix issue #2745: existing_items only contain the Put/Update/Delete records,608        # so we will increase the index based on these events609        i = 0610        for request in data["TransactItems"]:611            put_request = request.get("Put")612            if put_request:613                existing_item = self._thread_local("existing_items")[i]614                table_name = put_request["TableName"]615                keys = dynamodb_extract_keys(item=put_request["Item"], table_name=table_name)616                if isinstance(keys, Response):617                    return keys618                # Add stream view type to record if ddb stream is enabled619                stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)620                if stream_spec:621                    record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]622                new_record = clone(record)623                new_record["eventID"] = short_uid()624                new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"625                new_record["dynamodb"]["Keys"] = keys626                new_record["dynamodb"]["NewImage"] = put_request["Item"]627                if existing_item:628                    new_record["dynamodb"]["OldImage"] = existing_item629                new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)630                new_record["dynamodb"]["SizeBytes"] = len(json.dumps(put_request["Item"]))631                records.append(new_record)632                i += 1633            update_request = request.get("Update")634            if update_request:635                table_name = update_request["TableName"]636                keys = update_request["Key"]637                if isinstance(keys, Response):638                    return keys639                updated_item = find_existing_item(update_request, table_name)640                if not updated_item:641                    return []642                stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)643                if stream_spec:644                    record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]645                new_record = clone(record)646                new_record["eventID"] = short_uid()647                new_record["eventName"] = "MODIFY"648                new_record["dynamodb"]["Keys"] = keys649                new_record["dynamodb"]["OldImage"] = self._thread_local("existing_items")[i]650                new_record["dynamodb"]["NewImage"] = updated_item651                new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)652                new_record["dynamodb"]["SizeBytes"] = len(json.dumps(updated_item))653                records.append(new_record)654                i += 1655            delete_request = request.get("Delete")656            if delete_request:657                table_name = delete_request["TableName"]658                keys = delete_request["Key"]659                existing_item = self._thread_local("existing_items")[i]660                if isinstance(keys, Response):661                    return keys662                stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)663                if stream_spec:664                    record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]665                new_record = clone(record)666                new_record["eventID"] = short_uid()667                new_record["eventName"] = "REMOVE"668                new_record["dynamodb"]["Keys"] = keys669                new_record["dynamodb"]["OldImage"] = existing_item670                new_record["dynamodb"]["SizeBytes"] = len(json.dumps(existing_item))671                new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)672                records.append(new_record)673                i += 1674        return records675    def prepare_records_to_forward_to_ddb_stream(self, records):676        # StreamViewType determines what information is written to the stream for the table677        # When an item in the table is inserted, updated or deleted678        for record in records:679            if record["dynamodb"].get("StreamViewType"):680                # KEYS_ONLY  - Only the key attributes of the modified item are written to the stream681                if record["dynamodb"]["StreamViewType"] == "KEYS_ONLY":682                    record["dynamodb"].pop("OldImage", None)683                    record["dynamodb"].pop("NewImage", None)684                # NEW_IMAGE - The entire item, as it appears after it was modified, is written to the stream685                elif record["dynamodb"]["StreamViewType"] == "NEW_IMAGE":686                    record["dynamodb"].pop("OldImage", None)687                # OLD_IMAGE - The entire item, as it appeared before it was modified, is written to the stream688                elif record["dynamodb"]["StreamViewType"] == "OLD_IMAGE":689                    record["dynamodb"].pop("NewImage", None)690        return records691    def delete_all_event_source_mappings(self, table_arn):692        if table_arn:693            # fix start dynamodb service without lambda694            if not is_api_enabled("lambda"):695                return696            lambda_client = aws_stack.connect_to_service("lambda")697            result = lambda_client.list_event_source_mappings(EventSourceArn=table_arn)698            for event in result["EventSourceMappings"]:699                event_source_mapping_id = event["UUID"]700                lambda_client.delete_event_source_mapping(UUID=event_source_mapping_id)701    @staticmethod702    def _thread_local(name, default=None):703        try:704            return getattr(ProxyListenerDynamoDB.thread_local, name)705        except AttributeError:706            return default707def get_sse_kms_managed_key():708    if MANAGED_KMS_KEYS.get(aws_stack.get_region()):709        return MANAGED_KMS_KEYS[aws_stack.get_region()]710    kms_client = aws_stack.connect_to_service("kms")711    key_data = kms_client.create_key(Description="Default key that protects DynamoDB data")712    key_id = key_data["KeyMetadata"]["KeyId"]713    # not really happy with this import here714    from localstack.services.kms import kms_listener715    kms_listener.set_key_managed(key_id)716    MANAGED_KMS_KEYS[aws_stack.get_region()] = key_id717    return key_id718def get_sse_description(data):719    if data.get("Enabled"):720        kms_master_key_id = data.get("KMSMasterKeyId")721        if not kms_master_key_id:722            # this is of course not the actual key for dynamodb, just a better, since existing, mock723            kms_master_key_id = get_sse_kms_managed_key()724        kms_master_key_id = aws_stack.kms_key_arn(kms_master_key_id)725        return {726            "Status": "ENABLED",727            "SSEType": "KMS",  # no other value is allowed here728            "KMSMasterKeyArn": kms_master_key_id,729        }730    return {}731def handle_special_request(method, path, data, headers):732    if path.startswith("/shell") or method == "GET":733        if path == "/shell":734            headers = {"Refresh": "0; url=%s/shell/" % config.TEST_DYNAMODB_URL}735            return aws_responses.requests_response("", headers=headers)736        return True737    if method == "OPTIONS":738        return 200739def create_global_table(data):740    table_name = data["GlobalTableName"]741    if table_name in DynamoDBRegion.GLOBAL_TABLES:742        return get_error_message(743            "Global Table with this name already exists",744            "GlobalTableAlreadyExistsException",745        )746    DynamoDBRegion.GLOBAL_TABLES[table_name] = data747    for group in data.get("ReplicationGroup", []):748        group["ReplicaStatus"] = "ACTIVE"749        group["ReplicaStatusDescription"] = "Replica active"750    result = {"GlobalTableDescription": data}751    return result752def describe_global_table(data):753    table_name = data["GlobalTableName"]754    details = DynamoDBRegion.GLOBAL_TABLES.get(table_name)755    if not details:756        return get_error_message(757            "Global Table with this name does not exist", "GlobalTableNotFoundException"758        )759    result = {"GlobalTableDescription": details}760    return result761def list_global_tables(data):762    result = [763        select_attributes(tab, ["GlobalTableName", "ReplicationGroup"])764        for tab in DynamoDBRegion.GLOBAL_TABLES.values()765    ]766    result = {"GlobalTables": result}767    return result768def update_global_table(data):769    table_name = data["GlobalTableName"]770    details = DynamoDBRegion.GLOBAL_TABLES.get(table_name)771    if not details:772        return get_error_message(773            "Global Table with this name does not exist", "GlobalTableNotFoundException"774        )775    for update in data.get("ReplicaUpdates", []):776        repl_group = details["ReplicationGroup"]777        # delete existing778        delete = update.get("Delete")779        if delete:780            details["ReplicationGroup"] = [781                g for g in repl_group if g["RegionName"] != delete["RegionName"]782            ]783        # create new784        create = update.get("Create")785        if create:786            exists = [g for g in repl_group if g["RegionName"] == create["RegionName"]]787            if exists:788                continue789            new_group = {790                "RegionName": create["RegionName"],791                "ReplicaStatus": "ACTIVE",792                "ReplicaStatusDescription": "Replica active",793            }794            details["ReplicationGroup"].append(new_group)795    result = {"GlobalTableDescription": details}796    return result797def is_index_query_valid(table_name, index_query_type):798    schema = get_table_schema(table_name)799    for index in schema["Table"].get("GlobalSecondaryIndexes", []):800        index_projection_type = index.get("Projection").get("ProjectionType")801        if index_query_type == "ALL_ATTRIBUTES" and index_projection_type != "ALL":802            return False803    return True804def has_event_sources_or_streams_enabled(table_name, cache={}):805    if not table_name:806        return807    table_arn = aws_stack.dynamodb_table_arn(table_name)808    cached = cache.get(table_arn)809    if isinstance(cached, bool):810        return cached811    sources = lambda_api.get_event_sources(source_arn=table_arn)812    result = False813    if sources:814        result = True815    if not result and dynamodbstreams_api.get_stream_for_table(table_arn):816        result = True817    cache[table_arn] = result818    # if kinesis streaming destination is enabled819    # get table name from table_arn820    # since batch_wrtie and transact write operations passing table_arn instead of table_name821    table_name = table_arn.split("/", 1)[-1]822    table_definitions = DynamoDBRegion.get().table_definitions823    if not result and table_definitions.get(table_name):824        if table_definitions[table_name].get("KinesisDataStreamDestinationStatus") == "ACTIVE":825            result = True826    return result827def get_table_schema(table_name):828    key = "%s/%s" % (aws_stack.get_region(), table_name)829    schema = SCHEMA_CACHE.get(key)830    if not schema:831        ddb_client = aws_stack.connect_to_service("dynamodb")832        schema = ddb_client.describe_table(TableName=table_name)833        SCHEMA_CACHE[key] = schema834    return schema835def find_existing_item(put_item, table_name=None):836    table_name = table_name or put_item["TableName"]837    ddb_client = aws_stack.connect_to_service("dynamodb")838    search_key = {}839    if "Key" in put_item:840        search_key = put_item["Key"]841    else:842        schema = get_table_schema(table_name)843        schemas = [schema["Table"]["KeySchema"]]844        for index in schema["Table"].get("GlobalSecondaryIndexes", []):845            # TODO846            # schemas.append(index['KeySchema'])847            pass848        for schema in schemas:849            for key in schema:850                key_name = key["AttributeName"]851                search_key[key_name] = put_item["Item"][key_name]852        if not search_key:853            return854    req = {"TableName": table_name, "Key": search_key}855    existing_item = aws_stack.dynamodb_get_item_raw(req)856    if not existing_item:857        return existing_item858    if "Item" not in existing_item:859        if "message" in existing_item:860            table_names = ddb_client.list_tables()["TableNames"]861            msg = "Unable to get item from DynamoDB (existing tables: %s ...truncated if >100 tables): %s" % (862                table_names,863                existing_item["message"],864            )865            LOGGER.warning(msg)866        return867    return existing_item.get("Item")868def get_error_message(message, error_type):869    response = error_response(message=message, error_type=error_type)870    fix_headers_for_updated_response(response)871    return response872def get_table_not_found_error():873    return get_error_message(874        message="Cannot do operations on a non-existent table",875        error_type="ResourceNotFoundException",876    )877def fix_headers_for_updated_response(response):878    response.headers["Content-Length"] = len(to_bytes(response.content))879    response.headers["x-amz-crc32"] = calculate_crc32(response)880def update_response_content(response, content, status_code=None):881    aws_responses.set_response_content(response, content)882    if status_code:883        response.status_code = status_code884    fix_headers_for_updated_response(response)885def update_put_item_response_content(data, response_content):886    # when return-values variable is set only then attribute data should be returned887    # in the response otherwise by default is should not return any data.888    # https://github.com/localstack/localstack/issues/2121889    if data.get("ReturnValues"):890        response_content = json.dumps({"Attributes": data["Item"]})891    return response_content892def calculate_crc32(response):893    return crc32(to_bytes(response.content)) & 0xFFFFFFFF894def create_dynamodb_stream(data, latest_stream_label):895    stream = data["StreamSpecification"]896    enabled = stream.get("StreamEnabled")897    if enabled not in [False, "False"]:898        table_name = data["TableName"]899        view_type = stream["StreamViewType"]900        dynamodbstreams_api.add_dynamodb_stream(901            table_name=table_name,902            latest_stream_label=latest_stream_label,903            view_type=view_type,904            enabled=enabled,905        )906def forward_to_lambda(records):907    for record in records:908        sources = lambda_api.get_event_sources(source_arn=record["eventSourceARN"])909        event = {"Records": [record]}910        for src in sources:911            if src.get("State") != "Enabled":912                continue913            lambda_api.run_lambda(914                func_arn=src["FunctionArn"],915                event=event,916                context={},917                asynchronous=not config.SYNCHRONOUS_DYNAMODB_EVENTS,918            )919def forward_to_ddb_stream(records):920    dynamodbstreams_api.forward_events(records)921def forward_to_kinesis_stream(records):922    kinesis = aws_stack.connect_to_service("kinesis")923    table_definitions = DynamoDBRegion.get().table_definitions924    for record in records:925        if record.get("eventSourceARN"):926            table_name = record["eventSourceARN"].split("/", 1)[-1]927            table_def = table_definitions.get(table_name) or {}928            if table_def.get("KinesisDataStreamDestinationStatus") == "ACTIVE":929                stream_name = table_def["KinesisDataStreamDestinations"][-1]["StreamArn"].split(930                    "/", 1931                )[-1]932                record["tableName"] = table_name933                record.pop("eventSourceARN", None)934                record["dynamodb"].pop("StreamViewType", None)935                partition_key = list(936                    filter(lambda key: key["KeyType"] == "HASH", table_def["KeySchema"])937                )[0]["AttributeName"]938                kinesis.put_record(939                    StreamName=stream_name,940                    Data=json.dumps(record),941                    PartitionKey=partition_key,942                )943def dynamodb_extract_keys(item, table_name):944    result = {}945    table_definitions = DynamoDBRegion.get().table_definitions946    if table_name not in table_definitions:947        LOGGER.warning("Unknown table: %s not found in %s" % (table_name, table_definitions))948        return None949    for key in table_definitions[table_name]["KeySchema"]:950        attr_name = key["AttributeName"]951        if attr_name not in item:952            return error_response(953                error_type="ValidationException",954                message="One of the required keys was not given a value",955            )956        result[attr_name] = item[attr_name]957    return result958def dynamodb_get_table_stream_specification(table_name):959    try:960        return get_table_schema(table_name)["Table"].get("StreamSpecification")961    except Exception as e:962        LOGGER.info(963            "Unable to get stream specification for table %s : %s %s"964            % (table_name, e, traceback.format_exc())965        )966        raise e967def is_kinesis_stream_exists(stream_arn):968    # connect to kinesis969    kinesis = aws_stack.connect_to_service("kinesis")970    stream_name_from_arn = stream_arn.split("/", 1)[1]971    # check if the stream exists in kinesis for the user972    filtered = list(973        filter(974            lambda stream_name: stream_name == stream_name_from_arn,975            kinesis.list_streams()["StreamNames"],976        )977    )978    return bool(filtered)979def dynamodb_enable_kinesis_streaming_destination(data, table_def):980    if table_def.get("KinesisDataStreamDestinationStatus") in [981        "DISABLED",...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
