How to use forward_to_kinesis_stream method in localstack

Best Python code snippet using localstack_python

dynamodb_listener.py

Source:dynamodb_listener.py Github

copy

Full Screen

...525 if "TableName" in data:526 records[0]["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)527 # forward to kinesis stream528 records_to_kinesis = copy.deepcopy(records)529 forward_to_kinesis_stream(records_to_kinesis)530 # forward to lambda and ddb_streams531 forward_to_lambda(records)532 records = self.prepare_records_to_forward_to_ddb_stream(records)533 forward_to_ddb_stream(records)534 # -------------535 # UTIL METHODS536 # -------------537 def prepare_request_headers(self, headers):538 # Note: We need to ensure that the same access key is used here for all requests,539 # otherwise DynamoDBLocal stores tables/items in separate namespaces540 headers["Authorization"] = re.sub(541 r"Credential=[^/]+/",542 r"Credential=%s/" % constants.TEST_AWS_ACCESS_KEY_ID,543 headers.get("Authorization") or "",544 )545 def prepare_batch_write_item_records(self, record, data):546 records = []547 unprocessed_items = {"PutRequest": {}, "DeleteRequest": {}}548 i = 0549 for table_name in sorted(data["RequestItems"].keys()):550 # Add stream view type to record if ddb stream is enabled551 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)552 if stream_spec:553 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]554 for request in data["RequestItems"][table_name]:555 put_request = request.get("PutRequest")556 existing_items = self._thread_local("existing_items")557 if put_request:558 if existing_items and len(existing_items) > i:559 existing_item = existing_items[i]560 keys = dynamodb_extract_keys(561 item=put_request["Item"], table_name=table_name562 )563 if isinstance(keys, Response):564 return keys565 new_record = clone(record)566 new_record["eventID"] = short_uid()567 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(put_request["Item"]))568 new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"569 new_record["dynamodb"]["Keys"] = keys570 new_record["dynamodb"]["NewImage"] = put_request["Item"]571 if existing_item:572 new_record["dynamodb"]["OldImage"] = existing_item573 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)574 records.append(new_record)575 unprocessed_put_items = self._thread_local("unprocessed_put_items")576 if unprocessed_put_items and len(unprocessed_put_items) > i:577 unprocessed_item = unprocessed_put_items[i]578 if unprocessed_item:579 unprocessed_items["PutRequest"].update(580 json.loads(json.dumps(unprocessed_item))581 )582 delete_request = request.get("DeleteRequest")583 if delete_request:584 if existing_items and len(existing_items) > i:585 keys = delete_request["Key"]586 if isinstance(keys, Response):587 return keys588 new_record = clone(record)589 new_record["eventID"] = short_uid()590 new_record["eventName"] = "REMOVE"591 new_record["dynamodb"]["Keys"] = keys592 new_record["dynamodb"]["OldImage"] = existing_items[i]593 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(existing_items[i]))594 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)595 records.append(new_record)596 unprocessed_delete_items = self._thread_local("unprocessed_delete_items")597 if unprocessed_delete_items and len(unprocessed_delete_items) > i:598 unprocessed_item = unprocessed_delete_items[i]599 if unprocessed_item:600 unprocessed_items["DeleteRequest"].update(601 json.loads(json.dumps(unprocessed_item))602 )603 i += 1604 return records, unprocessed_items605 def prepare_transact_write_item_records(self, record, data):606 records = []607 # Fix issue #2745: existing_items only contain the Put/Update/Delete records,608 # so we will increase the index based on these events609 i = 0610 for request in data["TransactItems"]:611 put_request = request.get("Put")612 if put_request:613 existing_item = self._thread_local("existing_items")[i]614 table_name = put_request["TableName"]615 keys = dynamodb_extract_keys(item=put_request["Item"], table_name=table_name)616 if isinstance(keys, Response):617 return keys618 # Add stream view type to record if ddb stream is enabled619 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)620 if stream_spec:621 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]622 new_record = clone(record)623 new_record["eventID"] = short_uid()624 new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"625 new_record["dynamodb"]["Keys"] = keys626 new_record["dynamodb"]["NewImage"] = put_request["Item"]627 if existing_item:628 new_record["dynamodb"]["OldImage"] = existing_item629 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)630 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(put_request["Item"]))631 records.append(new_record)632 i += 1633 update_request = request.get("Update")634 if update_request:635 table_name = update_request["TableName"]636 keys = update_request["Key"]637 if isinstance(keys, Response):638 return keys639 updated_item = find_existing_item(update_request, table_name)640 if not updated_item:641 return []642 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)643 if stream_spec:644 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]645 new_record = clone(record)646 new_record["eventID"] = short_uid()647 new_record["eventName"] = "MODIFY"648 new_record["dynamodb"]["Keys"] = keys649 new_record["dynamodb"]["OldImage"] = self._thread_local("existing_items")[i]650 new_record["dynamodb"]["NewImage"] = updated_item651 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)652 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(updated_item))653 records.append(new_record)654 i += 1655 delete_request = request.get("Delete")656 if delete_request:657 table_name = delete_request["TableName"]658 keys = delete_request["Key"]659 existing_item = self._thread_local("existing_items")[i]660 if isinstance(keys, Response):661 return keys662 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)663 if stream_spec:664 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]665 new_record = clone(record)666 new_record["eventID"] = short_uid()667 new_record["eventName"] = "REMOVE"668 new_record["dynamodb"]["Keys"] = keys669 new_record["dynamodb"]["OldImage"] = existing_item670 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(existing_item))671 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)672 records.append(new_record)673 i += 1674 return records675 def prepare_records_to_forward_to_ddb_stream(self, records):676 # StreamViewType determines what information is written to the stream for the table677 # When an item in the table is inserted, updated or deleted678 for record in records:679 if record["dynamodb"].get("StreamViewType"):680 # KEYS_ONLY - Only the key attributes of the modified item are written to the stream681 if record["dynamodb"]["StreamViewType"] == "KEYS_ONLY":682 record["dynamodb"].pop("OldImage", None)683 record["dynamodb"].pop("NewImage", None)684 # NEW_IMAGE - The entire item, as it appears after it was modified, is written to the stream685 elif record["dynamodb"]["StreamViewType"] == "NEW_IMAGE":686 record["dynamodb"].pop("OldImage", None)687 # OLD_IMAGE - The entire item, as it appeared before it was modified, is written to the stream688 elif record["dynamodb"]["StreamViewType"] == "OLD_IMAGE":689 record["dynamodb"].pop("NewImage", None)690 return records691 def delete_all_event_source_mappings(self, table_arn):692 if table_arn:693 # fix start dynamodb service without lambda694 if not is_api_enabled("lambda"):695 return696 lambda_client = aws_stack.connect_to_service("lambda")697 result = lambda_client.list_event_source_mappings(EventSourceArn=table_arn)698 for event in result["EventSourceMappings"]:699 event_source_mapping_id = event["UUID"]700 lambda_client.delete_event_source_mapping(UUID=event_source_mapping_id)701 @staticmethod702 def _thread_local(name, default=None):703 try:704 return getattr(ProxyListenerDynamoDB.thread_local, name)705 except AttributeError:706 return default707def get_sse_kms_managed_key():708 if MANAGED_KMS_KEYS.get(aws_stack.get_region()):709 return MANAGED_KMS_KEYS[aws_stack.get_region()]710 kms_client = aws_stack.connect_to_service("kms")711 key_data = kms_client.create_key(Description="Default key that protects DynamoDB data")712 key_id = key_data["KeyMetadata"]["KeyId"]713 # not really happy with this import here714 from localstack.services.kms import kms_listener715 kms_listener.set_key_managed(key_id)716 MANAGED_KMS_KEYS[aws_stack.get_region()] = key_id717 return key_id718def get_sse_description(data):719 if data.get("Enabled"):720 kms_master_key_id = data.get("KMSMasterKeyId")721 if not kms_master_key_id:722 # this is of course not the actual key for dynamodb, just a better, since existing, mock723 kms_master_key_id = get_sse_kms_managed_key()724 kms_master_key_id = aws_stack.kms_key_arn(kms_master_key_id)725 return {726 "Status": "ENABLED",727 "SSEType": "KMS", # no other value is allowed here728 "KMSMasterKeyArn": kms_master_key_id,729 }730 return {}731def handle_special_request(method, path, data, headers):732 if path.startswith("/shell") or method == "GET":733 if path == "/shell":734 headers = {"Refresh": "0; url=%s/shell/" % config.TEST_DYNAMODB_URL}735 return aws_responses.requests_response("", headers=headers)736 return True737 if method == "OPTIONS":738 return 200739def create_global_table(data):740 table_name = data["GlobalTableName"]741 if table_name in DynamoDBRegion.GLOBAL_TABLES:742 return get_error_message(743 "Global Table with this name already exists",744 "GlobalTableAlreadyExistsException",745 )746 DynamoDBRegion.GLOBAL_TABLES[table_name] = data747 for group in data.get("ReplicationGroup", []):748 group["ReplicaStatus"] = "ACTIVE"749 group["ReplicaStatusDescription"] = "Replica active"750 result = {"GlobalTableDescription": data}751 return result752def describe_global_table(data):753 table_name = data["GlobalTableName"]754 details = DynamoDBRegion.GLOBAL_TABLES.get(table_name)755 if not details:756 return get_error_message(757 "Global Table with this name does not exist", "GlobalTableNotFoundException"758 )759 result = {"GlobalTableDescription": details}760 return result761def list_global_tables(data):762 result = [763 select_attributes(tab, ["GlobalTableName", "ReplicationGroup"])764 for tab in DynamoDBRegion.GLOBAL_TABLES.values()765 ]766 result = {"GlobalTables": result}767 return result768def update_global_table(data):769 table_name = data["GlobalTableName"]770 details = DynamoDBRegion.GLOBAL_TABLES.get(table_name)771 if not details:772 return get_error_message(773 "Global Table with this name does not exist", "GlobalTableNotFoundException"774 )775 for update in data.get("ReplicaUpdates", []):776 repl_group = details["ReplicationGroup"]777 # delete existing778 delete = update.get("Delete")779 if delete:780 details["ReplicationGroup"] = [781 g for g in repl_group if g["RegionName"] != delete["RegionName"]782 ]783 # create new784 create = update.get("Create")785 if create:786 exists = [g for g in repl_group if g["RegionName"] == create["RegionName"]]787 if exists:788 continue789 new_group = {790 "RegionName": create["RegionName"],791 "ReplicaStatus": "ACTIVE",792 "ReplicaStatusDescription": "Replica active",793 }794 details["ReplicationGroup"].append(new_group)795 result = {"GlobalTableDescription": details}796 return result797def is_index_query_valid(table_name, index_query_type):798 schema = get_table_schema(table_name)799 for index in schema["Table"].get("GlobalSecondaryIndexes", []):800 index_projection_type = index.get("Projection").get("ProjectionType")801 if index_query_type == "ALL_ATTRIBUTES" and index_projection_type != "ALL":802 return False803 return True804def has_event_sources_or_streams_enabled(table_name, cache={}):805 if not table_name:806 return807 table_arn = aws_stack.dynamodb_table_arn(table_name)808 cached = cache.get(table_arn)809 if isinstance(cached, bool):810 return cached811 sources = lambda_api.get_event_sources(source_arn=table_arn)812 result = False813 if sources:814 result = True815 if not result and dynamodbstreams_api.get_stream_for_table(table_arn):816 result = True817 cache[table_arn] = result818 # if kinesis streaming destination is enabled819 # get table name from table_arn820 # since batch_wrtie and transact write operations passing table_arn instead of table_name821 table_name = table_arn.split("/", 1)[-1]822 table_definitions = DynamoDBRegion.get().table_definitions823 if not result and table_definitions.get(table_name):824 if table_definitions[table_name].get("KinesisDataStreamDestinationStatus") == "ACTIVE":825 result = True826 return result827def get_table_schema(table_name):828 key = "%s/%s" % (aws_stack.get_region(), table_name)829 schema = SCHEMA_CACHE.get(key)830 if not schema:831 ddb_client = aws_stack.connect_to_service("dynamodb")832 schema = ddb_client.describe_table(TableName=table_name)833 SCHEMA_CACHE[key] = schema834 return schema835def find_existing_item(put_item, table_name=None):836 table_name = table_name or put_item["TableName"]837 ddb_client = aws_stack.connect_to_service("dynamodb")838 search_key = {}839 if "Key" in put_item:840 search_key = put_item["Key"]841 else:842 schema = get_table_schema(table_name)843 schemas = [schema["Table"]["KeySchema"]]844 for index in schema["Table"].get("GlobalSecondaryIndexes", []):845 # TODO846 # schemas.append(index['KeySchema'])847 pass848 for schema in schemas:849 for key in schema:850 key_name = key["AttributeName"]851 search_key[key_name] = put_item["Item"][key_name]852 if not search_key:853 return854 req = {"TableName": table_name, "Key": search_key}855 existing_item = aws_stack.dynamodb_get_item_raw(req)856 if not existing_item:857 return existing_item858 if "Item" not in existing_item:859 if "message" in existing_item:860 table_names = ddb_client.list_tables()["TableNames"]861 msg = "Unable to get item from DynamoDB (existing tables: %s ...truncated if >100 tables): %s" % (862 table_names,863 existing_item["message"],864 )865 LOGGER.warning(msg)866 return867 return existing_item.get("Item")868def get_error_message(message, error_type):869 response = error_response(message=message, error_type=error_type)870 fix_headers_for_updated_response(response)871 return response872def get_table_not_found_error():873 return get_error_message(874 message="Cannot do operations on a non-existent table",875 error_type="ResourceNotFoundException",876 )877def fix_headers_for_updated_response(response):878 response.headers["Content-Length"] = len(to_bytes(response.content))879 response.headers["x-amz-crc32"] = calculate_crc32(response)880def update_response_content(response, content, status_code=None):881 aws_responses.set_response_content(response, content)882 if status_code:883 response.status_code = status_code884 fix_headers_for_updated_response(response)885def update_put_item_response_content(data, response_content):886 # when return-values variable is set only then attribute data should be returned887 # in the response otherwise by default is should not return any data.888 # https://github.com/localstack/localstack/issues/2121889 if data.get("ReturnValues"):890 response_content = json.dumps({"Attributes": data["Item"]})891 return response_content892def calculate_crc32(response):893 return crc32(to_bytes(response.content)) & 0xFFFFFFFF894def create_dynamodb_stream(data, latest_stream_label):895 stream = data["StreamSpecification"]896 enabled = stream.get("StreamEnabled")897 if enabled not in [False, "False"]:898 table_name = data["TableName"]899 view_type = stream["StreamViewType"]900 dynamodbstreams_api.add_dynamodb_stream(901 table_name=table_name,902 latest_stream_label=latest_stream_label,903 view_type=view_type,904 enabled=enabled,905 )906def forward_to_lambda(records):907 for record in records:908 sources = lambda_api.get_event_sources(source_arn=record["eventSourceARN"])909 event = {"Records": [record]}910 for src in sources:911 if src.get("State") != "Enabled":912 continue913 lambda_api.run_lambda(914 func_arn=src["FunctionArn"],915 event=event,916 context={},917 asynchronous=not config.SYNCHRONOUS_DYNAMODB_EVENTS,918 )919def forward_to_ddb_stream(records):920 dynamodbstreams_api.forward_events(records)921def forward_to_kinesis_stream(records):922 kinesis = aws_stack.connect_to_service("kinesis")923 table_definitions = DynamoDBRegion.get().table_definitions924 for record in records:925 if record.get("eventSourceARN"):926 table_name = record["eventSourceARN"].split("/", 1)[-1]927 table_def = table_definitions.get(table_name) or {}928 if table_def.get("KinesisDataStreamDestinationStatus") == "ACTIVE":929 stream_name = table_def["KinesisDataStreamDestinations"][-1]["StreamArn"].split(930 "/", 1931 )[-1]932 record["tableName"] = table_name933 record.pop("eventSourceARN", None)934 record["dynamodb"].pop("StreamViewType", None)935 partition_key = list(...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful