How to use _iterable_to_stream method in localstack

Best Python code snippet using localstack_python

test_serializer.py

Source:test_serializer.py Github

copy

Full Screen

...1142 assert "headers_key1" in response["ResponseHeaders"]1143 assert "headers_key2" in response["ResponseHeaders"]1144 assert "headers_value1" == response["ResponseHeaders"]["headers_key1"]1145 assert "headers_value2" == response["ResponseHeaders"]["headers_key2"]1146def _iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):1147 """1148 Concerts a given iterable (generator) to a stream for testing purposes.1149 This wrapper does not "real streaming". The event serializer will wait for the end of the stream / generator.1150 """1151 class IterStream(io.RawIOBase):1152 def __init__(self):1153 self.leftover = None1154 def readable(self):1155 return True1156 def readinto(self, b):1157 try:1158 chunk = next(iterable)1159 b[: len(chunk)] = chunk1160 return len(chunk)1161 except StopIteration:1162 return 01163 return io.BufferedReader(IterStream(), buffer_size=buffer_size)1164def test_json_event_streaming():1165 # Create test events from Kinesis' SubscribeToShard operation1166 event_1 = {1167 "SubscribeToShardEvent": {1168 "Records": [1169 {1170 "SequenceNumber": "1",1171 "Data": b"event_data_1_record_1",1172 "PartitionKey": "event_1_partition_key_record_1",1173 },1174 {1175 "SequenceNumber": "2",1176 "Data": b"event_data_1_record_2",1177 "PartitionKey": "event_1_partition_key_record_1",1178 },1179 ],1180 "ContinuationSequenceNumber": "1",1181 "MillisBehindLatest": 1337,1182 }1183 }1184 event_2 = {1185 "SubscribeToShardEvent": {1186 "Records": [1187 {1188 "SequenceNumber": "3",1189 "Data": b"event_data_2_record_1",1190 "PartitionKey": "event_2_partition_key_record_1",1191 },1192 {1193 "SequenceNumber": "4",1194 "Data": b"event_data_2_record_2",1195 "PartitionKey": "event_2_partition_key_record_2",1196 },1197 ],1198 "ContinuationSequenceNumber": "2",1199 "MillisBehindLatest": 1338,1200 }1201 }1202 # Create the response which contains the generator1203 def event_generator() -> Iterator:1204 yield event_11205 yield event_21206 response = {"EventStream": event_generator()}1207 # Serialize the response1208 service = load_service("kinesis")1209 operation_model = service.operation_model("SubscribeToShard")1210 response_serializer = create_serializer(service)1211 serialized_response = response_serializer.serialize_to_response(response, operation_model)1212 # Convert the Werkzeug response from our serializer to a response botocore can work with1213 urllib_response = UrlLibHttpResponse(1214 body=_iterable_to_stream(serialized_response.response),1215 headers={1216 entry_tuple[0].lower(): entry_tuple[1] for entry_tuple in serialized_response.headers1217 },1218 status=serialized_response.status_code,1219 decode_content=False,1220 preload_content=False,1221 )1222 requests_response = RequestsResponse()1223 requests_response.headers = urllib_response.headers1224 requests_response.status_code = urllib_response.status1225 requests_response.raw = urllib_response1226 botocore_response = convert_to_response_dict(requests_response, operation_model)1227 # parse the response using botocore1228 response_parser: ResponseParser = create_parser(service.protocol)...

Full Screen

Full Screen

stream.py

Source:stream.py Github

copy

Full Screen

...38 You'll get the raw elements yielded by the generator.39 """40 return self._iterator41 def as_file(self) -> io.BufferedReader:42 return self._iterable_to_stream(self._iterator, self.encode_record_as_bytes)43 def readlines(self):44 """45 Yield each element of a the generator, one by one.46 (ex: line by line for file)47 """48 for record in self:49 yield self.decode_record(self.encode_record(record))50 @classmethod51 def create_from_stream(cls, source_stream):52 if isinstance(source_stream, cls):53 return source_stream54 return cls(source_stream.name, source_stream.readlines())55 @classmethod56 def encode_record_as_bytes(cls, record) -> bytes:57 return (cls.encode_record(record) + "\n").encode("utf-8")58 @classmethod59 def encode_record(cls, record) -> str:60 raise NotImplementedError61 @classmethod62 def decode_record(cls, record):63 raise NotImplementedError64 @staticmethod65 def create_stream_name(name):66 ts = time.time()67 ts_as_string = datetime.fromtimestamp(ts).strftime("%Y-%m-%d-%H-%M-%S")68 return f"{name}_{ts_as_string}"69 @property70 def name(self):71 return ".".join(filter(None, [self._name, self.extension]))72 @staticmethod73 def _iterable_to_stream(iterable, encode, buffer_size=io.DEFAULT_BUFFER_SIZE):74 """75 Credit goes to 'Mechanical snail'76 at https://stackoverflow.com/questions/6657820/python-convert-an-iterable-to-a-stream77 Lets you use an iterable (e.g. a generator) that yields bytestrings as a78 read-only79 input stream.80 The stream implements Python 3's newer I/O API (available in Python 2's io81 module).82 For efficiency, the stream is buffered.83 """84 class IterStream(io.RawIOBase):85 def __init__(self):86 self.leftover = None87 self.count = 0...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful