Best Python code snippet using localstack_python
serializer.py
Source:serializer.py  
...221        """222        event_stream_shape = operation_model.get_event_stream_output()223        event_stream_member_name = operation_model.output_shape.event_stream_name224        # wrap the generator in operation specific serialization225        def event_stream_serializer() -> Iterable[bytes]:226            yield self._encode_event_payload("initial-response")227            # yield convert_to_binary_event_payload("", event_type="initial-response")228            # create a default response229            serialized_event_response = self._create_default_response(operation_model)230            # get the members of the event stream shape231            event_stream_shape_members = (232                event_stream_shape.members if event_stream_shape is not None else None233            )234            # extract the generator from the given response data235            event_generator = response.get(event_stream_member_name)236            if not isinstance(event_generator, Iterator):237                raise ProtocolSerializerError(238                    "Expected iterator for streaming event serialization."239                )240            # yield one event per generated event241            for event in event_generator:242                # find the actual event payload (the member with event=true)243                event_member = None244                for member in event_stream_shape_members.values():245                    if member.serialization.get("event"):246                        event_member = member247                        break248                if event_member is None:249                    raise UnknownSerializerError("Couldn't find event shape for serialization.")250                # serialize the part of the response for the event251                self._serialize_response(252                    event.get(event_member.name),253                    serialized_event_response,254                    event_member,255                    event_member.members if event_member is not None else None,256                    operation_model,257                )258                # execute additional response traits (might be modifying the response)259                serialized_event_response = self._prepare_additional_traits_in_response(260                    serialized_event_response, operation_model261                )262                # encode the event and yield it263                yield self._encode_event_payload(264                    event_type=event_member.name, content=serialized_event_response.data265                )266        return HttpResponse(267            response=event_stream_serializer(),268            status=operation_model.http.get("responseCode", 200),269        )270    def _encode_event_payload(271        self,272        event_type: str,273        content: Union[str, bytes] = "",274        error_code: Optional[str] = None,275        error_message: Optional[str] = None,276    ) -> bytes:277        """278        Encodes the given event payload according to AWS specific binary event encoding.279        A specification of the format can be found in the AWS docs:280        https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html281        :param content: string or bytes of the event payload...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
