Best Python code snippet using localstack_python
worker.py
Source:worker.py  
...156    codec: temporalio.converter.PayloadCodec,157) -> None:158    """Encode payloads with the given codec."""159    return await _apply_to_payloads(payloads, codec.encode)160async def _encode_payload(161    payload: temporalio.api.common.v1.Payload,162    codec: temporalio.converter.PayloadCodec,163) -> None:164    """Decode a payload with the given codec."""165    return await _apply_to_payload(payload, codec.encode)166async def decode_activation(167    act: temporalio.bridge.proto.workflow_activation.WorkflowActivation,168    codec: temporalio.converter.PayloadCodec,169) -> None:170    """Decode the given activation with the codec."""171    for job in act.jobs:172        if job.HasField("cancel_workflow"):173            await _decode_payloads(job.cancel_workflow.details, codec)174        elif job.HasField("query_workflow"):175            await _decode_payloads(job.query_workflow.arguments, codec)176        elif job.HasField("resolve_activity"):177            if job.resolve_activity.result.HasField("cancelled"):178                await temporalio.exceptions.decode_failure(179                    job.resolve_activity.result.cancelled.failure, codec180                )181            elif job.resolve_activity.result.HasField("completed"):182                if job.resolve_activity.result.completed.HasField("result"):183                    await _decode_payload(184                        job.resolve_activity.result.completed.result, codec185                    )186            elif job.resolve_activity.result.HasField("failed"):187                await temporalio.exceptions.decode_failure(188                    job.resolve_activity.result.failed.failure, codec189                )190        elif job.HasField("resolve_child_workflow_execution"):191            if job.resolve_child_workflow_execution.result.HasField("cancelled"):192                await temporalio.exceptions.decode_failure(193                    job.resolve_child_workflow_execution.result.cancelled.failure, codec194                )195            elif job.resolve_child_workflow_execution.result.HasField(196                "completed"197            ) and job.resolve_child_workflow_execution.result.completed.HasField(198                "result"199            ):200                await _decode_payload(201                    job.resolve_child_workflow_execution.result.completed.result, codec202                )203            elif job.resolve_child_workflow_execution.result.HasField("failed"):204                await temporalio.exceptions.decode_failure(205                    job.resolve_child_workflow_execution.result.failed.failure, codec206                )207        elif job.HasField("resolve_child_workflow_execution_start"):208            if job.resolve_child_workflow_execution_start.HasField("cancelled"):209                await temporalio.exceptions.decode_failure(210                    job.resolve_child_workflow_execution_start.cancelled.failure, codec211                )212        elif job.HasField("resolve_request_cancel_external_workflow"):213            if job.resolve_request_cancel_external_workflow.HasField("failure"):214                await temporalio.exceptions.decode_failure(215                    job.resolve_request_cancel_external_workflow.failure, codec216                )217        elif job.HasField("resolve_signal_external_workflow"):218            if job.resolve_signal_external_workflow.HasField("failure"):219                await temporalio.exceptions.decode_failure(220                    job.resolve_signal_external_workflow.failure, codec221                )222        elif job.HasField("signal_workflow"):223            await _decode_payloads(job.signal_workflow.input, codec)224        elif job.HasField("start_workflow"):225            await _decode_payloads(job.start_workflow.arguments, codec)226            if job.start_workflow.HasField("continued_failure"):227                await temporalio.exceptions.decode_failure(228                    job.start_workflow.continued_failure, codec229                )230            for val in job.start_workflow.memo.fields.values():231                # This uses API payload not bridge payload232                new_payload = (await codec.decode([val]))[0]233                val.metadata.clear()234                val.metadata.update(new_payload.metadata)235                val.data = new_payload.data236async def encode_completion(237    comp: temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion,238    codec: temporalio.converter.PayloadCodec,239) -> None:240    """Recursively encode the given completion with the codec."""241    if comp.HasField("failed"):242        await temporalio.exceptions.encode_failure(comp.failed.failure, codec)243    elif comp.HasField("successful"):244        for command in comp.successful.commands:245            if command.HasField("complete_workflow_execution"):246                if command.complete_workflow_execution.HasField("result"):247                    await _encode_payload(248                        command.complete_workflow_execution.result, codec249                    )250            elif command.HasField("continue_as_new_workflow_execution"):251                await _encode_payloads(252                    command.continue_as_new_workflow_execution.arguments, codec253                )254                for val in command.continue_as_new_workflow_execution.memo.values():255                    await _encode_payload(val, codec)256            elif command.HasField("fail_workflow_execution"):257                await temporalio.exceptions.encode_failure(258                    command.fail_workflow_execution.failure, codec259                )260            elif command.HasField("respond_to_query"):261                if command.respond_to_query.HasField("failed"):262                    await temporalio.exceptions.encode_failure(263                        command.respond_to_query.failed, codec264                    )265                elif command.respond_to_query.HasField(266                    "succeeded"267                ) and command.respond_to_query.succeeded.HasField("response"):268                    await _encode_payload(269                        command.respond_to_query.succeeded.response, codec270                    )271            elif command.HasField("schedule_activity"):272                await _encode_payloads(command.schedule_activity.arguments, codec)273            elif command.HasField("schedule_local_activity"):274                await _encode_payloads(command.schedule_local_activity.arguments, codec)275            elif command.HasField("signal_external_workflow_execution"):276                await _encode_payloads(277                    command.signal_external_workflow_execution.args, codec278                )279            elif command.HasField("start_child_workflow_execution"):280                await _encode_payloads(281                    command.start_child_workflow_execution.input, codec282                )283                for val in command.start_child_workflow_execution.memo.values():...http.py
Source:http.py  
...11      verify_ssl --> bool, whether to check for SSL certs validity or not (optional)12    """13    _reqs = ['url', 'headers', 'auth', 'timeout_secs', 'verify_ssl']14    DEFAULT_TIMEOUT_SECS = 315    def _encode_payload(self, response, mime_type):16        if 'json' in mime_type:  # json17            return response.json()18        elif 'text' in mime_type:  # text19            return response.text20        else:21            return response.content  # binary data22# Status codes23class StatusCode(node.Node):24    """25    This node returns the status code of a raw HTTP response (requests.Response)26    Requirements:27      response --> requests.Response object28    """29    _reqs = ['response']30    def output(self):31        return self._params['response'].status_code32class StatusCodeOk(node.Node):33    """34    This node tells whether the status code of a raw HTTP response35    (requests.Response)is OK (2xx) or not36    Requirements:37      response --> requests.Response object38    """39    _reqs = ['response']40    def output(self):41        sc = self._params['response'].status_code42        return sc >= 200 and sc < 30043# GET44class RawGet(HttpClient):45    """46    This node returns a raw HTTP response (requests.Response) to a GET request47    Requirements:48      query --> dict, query parameters (optional)49    """50    _reqs = HttpClient._reqs + ['query']51    def output(self):52        r = requests.get(53            self._params['url'],54            auth=self._params['auth'],55            params=self._params['query'],56            headers=self._params['headers'],57            timeout=self._params['timeout_secs'] or self.DEFAULT_TIMEOUT_SECS,58            verify=self._params['verify_ssl'] if self._params['verify_ssl'] is not None else True)59        return r60class Get(RawGet):61    """62    This node returns the payload response to a HTTP GET call. The payload format63    depends on the specified MIME type64    Requirements:65      mime_type --> str, MIME type for server response payload66    """67    _reqs = RawGet._reqs + ['mime_type']68    def output(self):69        raw_response = RawGet.output(self)70        payload = self._encode_payload(raw_response,71                                       self._params['mime_type'] or 'application/json')72        return payload73# POST74class RawPost(HttpClient):75    """76    This node returns a raw HTTP response (requests.Response) to a POST request77    Requirements:78      post_data --> dict, the post-ed form data79      files --> dict, multipart post-ed data (optional)80    """81    _reqs = HttpClient._reqs + ['post_data', 'files']82    def output(self):83        r = requests.post(84            self._params['url'],85            data=self._params['post_data'],86            auth=self._params['auth'],87            files=self._params['files'],88            headers=self._params['headers'],89            timeout=self._params['timeout_secs'] or self.DEFAULT_TIMEOUT_SECS,90            verify=self._params['verify_ssl'] if self._params['verify_ssl'] is not None else True)91        return r92class Post(RawPost):93    """94    This node returns the payload response to a HTTP POST call. The payload format95    depends on the specified MIME type96    Requirements:97      mime_type --> str, MIME type for server response payload98    """99    _reqs = RawPost._reqs + ['mime_type']100    def output(self):101        raw_response = RawPost.output(self)102        payload = self._encode_payload(raw_response,103                                       self._params['mime_type'] or 'application/json')104        return payload105# PUT106class RawPut(HttpClient):107    """108    This node returns a raw HTTP response (requests.Response) to a PUT request109    Requirements:110      put_data --> dict, the put form data (optional)111    """112    _reqs = HttpClient._reqs + ['put_data']113    def output(self):114        r = requests.put(115            self._params['url'],116            data=self._params['put_data'],117            auth=self._params['auth'],118            headers=self._params['headers'],119            timeout=self._params['timeout_secs'] or self.DEFAULT_TIMEOUT_SECS,120            verify=self._params['verify_ssl'] if self._params['verify_ssl'] is not None else True)121        return r122class Put(RawPut):123    """124    This node returns the payload response to a HTTP PUT call. The payload format125    depends on the specified MIME type126    Requirements:127      mime_type --> str, MIME type for server response payload128    """129    _reqs = RawPut._reqs + ['mime_type']130    def output(self):131        raw_response = RawPut.output(self)132        payload = self._encode_payload(raw_response,133                                       self._params['mime_type'] or 'application/json')134        return payload135# DELETE136class RawDelete(HttpClient):137    """138    This node returns a raw HTTP response (requests.Response) to a DELETE request139    Requirements:140      delete_data --> dict, the delete form data (optional)141    """142    _reqs = HttpClient._reqs + ['delete_data']143    def output(self):144        r = requests.put(145            self._params['url'],146            data=self._params['delete_data'],147            auth=self._params['auth'],148            headers=self._params['headers'],149            timeout=self._params['timeout_secs'] or self.DEFAULT_TIMEOUT_SECS,150            verify=self._params['verify_ssl'] if self._params['verify_ssl'] is not None else True)151        return r152class Delete(RawDelete):153    """154    This node returns the payload response to a HTTP DELETE call. The payload155    format depends on the specified MIME type156    Requirements:157      mime_type --> str, MIME type for server response payload158    """159    _reqs = RawDelete._reqs + ['mime_type']160    def output(self):161        raw_response = RawDelete.output(self)162        payload = self._encode_payload(raw_response,163                                       self._params['mime_type'] or 'application/json')...encoder.py
Source:encoder.py  
...8    """9    def __init__(self, payload: supported_types):10        self._payload = payload11    def encode(self) -> supported_types:12        return self._encode_payload(self._payload)13    def _encode_payload(self, payload):14        if isinstance(payload, str):15            return self._from_string(payload)16        elif isinstance(payload, int):17            return self._from_int(payload)18        elif isinstance(payload, list):19            return self._from_list(payload)20        elif isinstance(payload, bytes):21            return self._from_bytes(payload)22        elif isinstance(payload, dict) or isinstance(payload, OrderedDict):23            return self._from_dict(payload)24        else:25            data_type = type(payload).__name__26            err = f'[error] Cannot encode object of type {data_type}'27            raise TypeError(err)28    def _from_dict(self, _dict) -> bytearray:29        # we use a bytearray because it's impossible to30        # concatenate bytearray types to strings31        encoded_dict: bytearray = bytearray('d', 'utf-8')32        for key, value in _dict.items():33            k: Union[bytes, bytearray] = self._encode_payload(key)34            v: Union[bytes, bytearray] = self._encode_payload(value)35            encoded_dict += k36            encoded_dict += v37        encoded_dict += EOT_IDENTIFIER38        return encoded_dict39    def _from_list(self, _list) -> bytearray:40        # we use a bytearray because it's impossible to41        # concatenate bytearray types to strings42        encoded_list: bytearray = bytearray('l', 'utf-8')43        encoded_list += b''.join([44            self._encode_payload(_list[idx])45            for idx in range(len(_list))46        ])47        encoded_list += EOT_IDENTIFIER48        return encoded_list49    def _from_int(self, _int) -> bytes:50        return bytes(f'i{str(_int)}e', 'utf-8')51    def _from_string(self, string) -> bytes:52        return bytes(f'{len(string)}:{string}', 'utf-8')53    def _from_bytes(self, _bytes) -> bytearray:54        # copied from pieces.bencoding.Encoder55        result = bytearray()56        result += str.encode(str(len(_bytes)))57        result += b':'58        result += _bytes...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
