Best Python code snippet using localstack_python
test_serializer.py
Source:test_serializer.py  
...1142    assert "headers_key1" in response["ResponseHeaders"]1143    assert "headers_key2" in response["ResponseHeaders"]1144    assert "headers_value1" == response["ResponseHeaders"]["headers_key1"]1145    assert "headers_value2" == response["ResponseHeaders"]["headers_key2"]1146def _iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):1147    """1148    Concerts a given iterable (generator) to a stream for testing purposes.1149    This wrapper does not "real streaming". The event serializer will wait for the end of the stream / generator.1150    """1151    class IterStream(io.RawIOBase):1152        def __init__(self):1153            self.leftover = None1154        def readable(self):1155            return True1156        def readinto(self, b):1157            try:1158                chunk = next(iterable)1159                b[: len(chunk)] = chunk1160                return len(chunk)1161            except StopIteration:1162                return 01163    return io.BufferedReader(IterStream(), buffer_size=buffer_size)1164def test_json_event_streaming():1165    # Create test events from Kinesis' SubscribeToShard operation1166    event_1 = {1167        "SubscribeToShardEvent": {1168            "Records": [1169                {1170                    "SequenceNumber": "1",1171                    "Data": b"event_data_1_record_1",1172                    "PartitionKey": "event_1_partition_key_record_1",1173                },1174                {1175                    "SequenceNumber": "2",1176                    "Data": b"event_data_1_record_2",1177                    "PartitionKey": "event_1_partition_key_record_1",1178                },1179            ],1180            "ContinuationSequenceNumber": "1",1181            "MillisBehindLatest": 1337,1182        }1183    }1184    event_2 = {1185        "SubscribeToShardEvent": {1186            "Records": [1187                {1188                    "SequenceNumber": "3",1189                    "Data": b"event_data_2_record_1",1190                    "PartitionKey": "event_2_partition_key_record_1",1191                },1192                {1193                    "SequenceNumber": "4",1194                    "Data": b"event_data_2_record_2",1195                    "PartitionKey": "event_2_partition_key_record_2",1196                },1197            ],1198            "ContinuationSequenceNumber": "2",1199            "MillisBehindLatest": 1338,1200        }1201    }1202    # Create the response which contains the generator1203    def event_generator() -> Iterator:1204        yield event_11205        yield event_21206    response = {"EventStream": event_generator()}1207    # Serialize the response1208    service = load_service("kinesis")1209    operation_model = service.operation_model("SubscribeToShard")1210    response_serializer = create_serializer(service)1211    serialized_response = response_serializer.serialize_to_response(response, operation_model)1212    # Convert the Werkzeug response from our serializer to a response botocore can work with1213    urllib_response = UrlLibHttpResponse(1214        body=_iterable_to_stream(serialized_response.response),1215        headers={1216            entry_tuple[0].lower(): entry_tuple[1] for entry_tuple in serialized_response.headers1217        },1218        status=serialized_response.status_code,1219        decode_content=False,1220        preload_content=False,1221    )1222    requests_response = RequestsResponse()1223    requests_response.headers = urllib_response.headers1224    requests_response.status_code = urllib_response.status1225    requests_response.raw = urllib_response1226    botocore_response = convert_to_response_dict(requests_response, operation_model)1227    # parse the response using botocore1228    response_parser: ResponseParser = create_parser(service.protocol)...stream.py
Source:stream.py  
...38            You'll get the raw elements yielded by the generator.39        """40        return self._iterator41    def as_file(self) -> io.BufferedReader:42        return self._iterable_to_stream(self._iterator, self.encode_record_as_bytes)43    def readlines(self):44        """45            Yield each element of a the generator, one by one.46            (ex: line by line for file)47        """48        for record in self:49            yield self.decode_record(self.encode_record(record))50    @classmethod51    def create_from_stream(cls, source_stream):52        if isinstance(source_stream, cls):53            return source_stream54        return cls(source_stream.name, source_stream.readlines())55    @classmethod56    def encode_record_as_bytes(cls, record) -> bytes:57        return (cls.encode_record(record) + "\n").encode("utf-8")58    @classmethod59    def encode_record(cls, record) -> str:60        raise NotImplementedError61    @classmethod62    def decode_record(cls, record):63        raise NotImplementedError64    @staticmethod65    def create_stream_name(name):66        ts = time.time()67        ts_as_string = datetime.fromtimestamp(ts).strftime("%Y-%m-%d-%H-%M-%S")68        return f"{name}_{ts_as_string}"69    @property70    def name(self):71        return ".".join(filter(None, [self._name, self.extension]))72    @staticmethod73    def _iterable_to_stream(iterable, encode, buffer_size=io.DEFAULT_BUFFER_SIZE):74        """75        Credit goes to 'Mechanical snail'76            at https://stackoverflow.com/questions/6657820/python-convert-an-iterable-to-a-stream77        Lets you use an iterable (e.g. a generator) that yields bytestrings as a78        read-only79        input stream.80        The stream implements Python 3's newer I/O API (available in Python 2's io81        module).82        For efficiency, the stream is buffered.83        """84        class IterStream(io.RawIOBase):85            def __init__(self):86                self.leftover = None87                self.count = 0...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
