How to use _serialize_event_stream method in localstack

Best Python code snippet using localstack_python

serializer.py

Source:serializer.py Github

copy

Full Screen

...141 :raises: ResponseSerializerError (either a ProtocolSerializerError or an UnknownSerializerError)142 """143 # if the operation has a streaming output, handle the serialization differently144 if operation_model.has_event_stream_output:145 return self._serialize_event_stream(response, operation_model)146 serialized_response = self._create_default_response(operation_model)147 shape = operation_model.output_shape148 # The shape can also be none (for empty responses), but it still needs to be serialized (to add some metadata)149 shape_members = shape.members if shape is not None else None150 self._serialize_response(151 response, serialized_response, shape, shape_members, operation_model152 )153 serialized_response = self._prepare_additional_traits_in_response(154 serialized_response, operation_model155 )156 return serialized_response157 @_handle_exceptions158 def serialize_error_to_response(159 self, error: ServiceException, operation_model: OperationModel160 ) -> HttpResponse:161 """162 Takes an error instance and serializes it to an actual HttpResponse.163 Therefore this method is used for errors which should be serialized and transmitted to the calling client.164 :param error: to serialize165 :param operation_model: specification of the service & operation containing information about the shape of the166 service's output / response167 :return: HttpResponse which can be sent to the calling client168 :raises: ResponseSerializerError (either a ProtocolSerializerError or an UnknownSerializerError)169 """170 # TODO implement streaming error serialization171 serialized_response = self._create_default_response(operation_model)172 if not error or not isinstance(error, ServiceException):173 raise ProtocolSerializerError(174 f"Error to serialize ({error.__class__.__name__ if error else None}) is not a ServiceException."175 )176 shape = operation_model.service_model.shape_for_error_code(error.code)177 serialized_response.status_code = error.status_code178 self._serialize_error(error, serialized_response, shape, operation_model)179 serialized_response = self._prepare_additional_traits_in_response(180 serialized_response, operation_model181 )182 return serialized_response183 def _serialize_response(184 self,185 parameters: dict,186 response: HttpResponse,187 shape: Optional[Shape],188 shape_members: dict,189 operation_model: OperationModel,190 ) -> None:191 raise NotImplementedError192 def _serialize_body_params(193 self, params: dict, shape: Shape, operation_model: OperationModel194 ) -> Optional[str]:195 """196 Actually serializes the given params for the given shape to a string for the transmission in the body of the197 response.198 :param params: to serialize199 :param shape: to know how to serialize the params200 :param operation_model: for additional metadata201 :return: string containing the serialized body202 """203 raise NotImplementedError204 def _serialize_error(205 self,206 error: ServiceException,207 response: HttpResponse,208 shape: StructureShape,209 operation_model: OperationModel,210 ) -> None:211 raise NotImplementedError212 def _serialize_event_stream(213 self, response: dict, operation_model: OperationModel214 ) -> HttpResponse:215 """216 Serializes a given response dict (the return payload of a service implementation) to an _event stream_ using the217 given operation model.218 :param response: dictionary containing the payload for the response219 :param operation_model: describing the operation the response dict is being returned by220 :return: HttpResponse which can directly be sent to the client (in chunks)221 """222 event_stream_shape = operation_model.get_event_stream_output()223 event_stream_member_name = operation_model.output_shape.event_stream_name224 # wrap the generator in operation specific serialization225 def event_stream_serializer() -> Iterable[bytes]:226 yield self._encode_event_payload("initial-response")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful