Best Python code snippet using tempest_python
protocol.py
Source:protocol.py  
...13    def __init__(self, transport):14        self.transport = transport15        transport.set_listener('protocol-listener', self)16        self.version = '1.0'17    def send_frame(self, cmd, headers={}, body=''):18        frame = utils.Frame(cmd, headers, body)19        self.transport.transmit(frame)20    def abort(self, transaction, headers={}, **keyword_headers):21        assert transaction is not None, "'transaction' is required"22        headers = utils.merge_headers([headers, keyword_headers])23        headers[HDR_TRANSACTION] = transaction24        self.send_frame(CMD_ABORT, headers)25    def ack(self, id, transaction=None):26        assert id is not None, "'id' is required"27        headers = { HDR_MESSAGE_ID : id }28        if transaction:29            headers[HDR_TRANSACTION] = transaction30        self.send_frame(CMD_ACK, headers)31    def begin(self, transaction=None, headers={}, **keyword_headers):32        headers = utils.merge_headers([headers, keyword_headers])33        if not transaction:34            transaction = str(uuid.uuid4())35        headers[HDR_TRANSACTION] = transaction36        self.send_frame(CMD_BEGIN, headers)37        return transaction38    def commit(self, transaction=None, headers={}, **keyword_headers):39        assert transaction is not None, "'transaction' is required"40        headers = utils.merge_headers([headers, keyword_headers])41        headers[HDR_TRANSACTION] = transaction42        self.send_frame('COMMIT', headers)43    def connect(self, username=None, passcode=None, wait=False, headers={}, **keyword_headers):44        cmd = CMD_CONNECT45        headers = utils.merge_headers([headers, keyword_headers])46        headers[HDR_ACCEPT_VERSION] = self.version47        if username is not None:48            headers[HDR_LOGIN] = username49        if passcode is not None:50            headers[HDR_PASSCODE] = passcode51        self.send_frame(cmd, headers)52        if wait:53            self.transport.wait_for_connection()54            if self.transport.connection_error:55                raise ConnectFailedException()56    def disconnect(self, receipt=str(uuid.uuid4()), headers={}, **keyword_headers):57        headers = utils.merge_headers([headers, keyword_headers])58        if receipt:59            headers[HDR_RECEIPT] = receipt60        self.send_frame(CMD_DISCONNECT, headers)61    def send(self, destination, body, content_type=None, headers={}, **keyword_headers):62        assert destination is not None, "'destination' is required"63        assert body is not None, "'body' is required"64        headers = utils.merge_headers([headers, keyword_headers])65        headers[HDR_DESTINATION] = destination66        if content_type:67            headers[HDR_CONTENT_TYPE] = content_type68        body = encode(body)69        if body and HDR_CONTENT_LENGTH not in headers:70            headers[HDR_CONTENT_LENGTH] = len(body)71        self.send_frame(CMD_SEND, headers, body)72    def subscribe(self, destination, id=None, ack='auto', headers={}, **keyword_headers):73        assert destination is not None, "'destination' is required"74        headers = utils.merge_headers([headers, keyword_headers])75        headers[HDR_DESTINATION] = destination76        if id:77            headers[HDR_ID] = id78        headers[HDR_ACK] = ack79        self.send_frame(CMD_SUBSCRIBE, headers)80    def unsubscribe(self, destination=None, id=None, headers={}, **keyword_headers):81        assert id is not None or destination is not None, "'id' or 'destination' is required"82        headers = utils.merge_headers([headers, keyword_headers])83        if id:84            headers[HDR_ID] = id85        if destination:86            headers[HDR_DESTINATION] = destination87        self.send_frame(CMD_UNSUBSCRIBE, headers)88class Protocol11(HeartbeatListener, ConnectionListener):89    """90    Version 1.1 of the protocol.91    """92    def __init__(self, transport, heartbeats=(0, 0)):93        HeartbeatListener.__init__(self, heartbeats)94        self.transport = transport95        transport.set_listener('protocol-listener', self)96        self.version = '1.1'97    def _escape_headers(self, headers):98        for key,val in headers.items():99            try:100                val = val.replace('\\', '\\\\').replace('\n', '\\n').replace(':', '\\c')101            except: pass102            headers[key] = val103    def send_frame(self, cmd, headers={}, body=''):104        if cmd != CMD_CONNECT:105            self._escape_headers(headers)106        frame = utils.Frame(cmd, headers, body)107        self.transport.transmit(frame)108    def abort(self, transaction, headers={}, **keyword_headers):109        assert transaction is not None, "'transaction' is required"110        headers = utils.merge_headers([headers, keyword_headers])111        headers[HDR_TRANSACTION] = transaction112        self.send_frame(CMD_ABORT, headers)113    def ack(self, id, subscription, transaction=None):114        assert id is not None, "'id' is required"115        assert subscription is not None, "'subscription' is required"116        headers = { HDR_MESSAGE_ID : id, HDR_SUBSCRIPTION : subscription }117        if transaction:118            headers[HDR_TRANSACTION] = transaction119        self.send_frame(CMD_ACK, headers)120    def begin(self, transaction=None, headers={}, **keyword_headers):121        headers = utils.merge_headers([headers, keyword_headers])122        if not transaction:123            transaction = str(uuid.uuid4())124        headers[HDR_TRANSACTION] = transaction125        self.send_frame(CMD_BEGIN, headers)126        return transaction127    def commit(self, transaction=None, headers={}, **keyword_headers):128        assert transaction is not None, "'transaction' is required"129        headers = utils.merge_headers([headers, keyword_headers])130        headers[HDR_TRANSACTION] = transaction131        self.send_frame('COMMIT', headers)132    def connect(self, username=None, passcode=None, wait=False, headers={}, **keyword_headers):133        cmd = CMD_STOMP134        headers = utils.merge_headers([headers, keyword_headers])135        headers[HDR_ACCEPT_VERSION] = self.version136        if self.transport.vhost:137            headers[HDR_HOST] = self.transport.vhost138        if username is not None:139            headers[HDR_LOGIN] = username140        if passcode is not None:141            headers[HDR_PASSCODE] = passcode142        self.send_frame(cmd, headers)143        if wait:144            self.transport.wait_for_connection()145            if self.transport.connection_error:146                raise ConnectFailedException()147    def disconnect(self, receipt=str(uuid.uuid4()), headers={}, **keyword_headers):148        headers = utils.merge_headers([headers, keyword_headers])149        if receipt:150            headers[HDR_RECEIPT] = receipt151        self.send_frame(CMD_DISCONNECT, headers)152    def nack(self, id, subscription, transaction=None):153        assert id is not None, "'id' is required"154        assert subscription is not None, "'subscription' is required"155        headers = { HDR_MESSAGE_ID : id, HDR_SUBSCRIPTION : subscription }156        if transaction:157            headers[HDR_TRANSACTION] = transaction158        self.send_frame(CMD_NACK, headers)159    def send(self, destination, body, content_type=None, headers={}, **keyword_headers):160        assert destination is not None, "'destination' is required"161        assert body is not None, "'body' is required"162        headers = utils.merge_headers([headers, keyword_headers])163        headers[HDR_DESTINATION] = destination164        if content_type:165            headers[HDR_CONTENT_TYPE] = content_type166        body = encode(body)167        if body and HDR_CONTENT_LENGTH not in headers:168            headers[HDR_CONTENT_LENGTH] = len(body)169        self.send_frame(CMD_SEND, headers, body)170    def subscribe(self, destination, id, ack='auto', headers={}, **keyword_headers):171        assert destination is not None, "'destination' is required"172        assert id is not None, "'id' is required"173        headers = utils.merge_headers([headers, keyword_headers])174        headers[HDR_DESTINATION] = destination175        headers[HDR_ID] = id176        headers[HDR_ACK] = ack177        self.send_frame(CMD_SUBSCRIBE, headers)178    def unsubscribe(self, id, headers={}, **keyword_headers):179        assert id is not None, "'id' is required"180        headers = utils.merge_headers([headers, keyword_headers])181        headers[HDR_ID] = id182        self.send_frame(CMD_UNSUBSCRIBE, headers)183class Protocol12(Protocol11):184    """185    Version 1.2 of the protocol.186    """187    def __init__(self, transport, heartbeats=(0, 0)):188        Protocol11.__init__(self, transport, heartbeats)189        self.version = '1.2'190    def _escape_headers(self, headers):191        for key,val in headers.items():192            try:193                val = val.replace('\\', '\\\\').replace('\n', '\\n').replace(':', '\\c').replace('\r', '\\r')194            except: pass195            headers[key] = val196    def ack(self, id, transaction=None):197        assert id is not None, "'id' is required"198        headers = { HDR_ID : id }199        if transaction:200            headers[HDR_TRANSACTION] = transaction201        self.send_frame(CMD_ACK, headers)202    def nack(self, id, transaction=None):203        assert id is not None, "'id' is required"204        headers = { HDR_ID : id }205        if transaction:206            headers[HDR_TRANSACTION] = transaction207        self.send_frame(CMD_NACK, headers)208    def connect(self, username=None, passcode=None, wait=False, headers={}, **keyword_headers):209        """210        Send a STOMP CONNECT frame. Differs from 1.0 and 1.1 versions in that the HOST header is enforced.211        \param username212            optionally specify the login user213        \param passcode214            optionally specify the user password215        \param wait216            wait for the connection to complete before returning217        """218        cmd = CMD_STOMP219        headers = utils.merge_headers([headers, keyword_headers])220        headers[HDR_ACCEPT_VERSION] = self.version221        headers[HDR_HOST] = self.transport.current_host_and_port[0]222        if self.transport.vhost:223            headers[HDR_HOST] = self.transport.vhost224        if username is not None:225            headers[HDR_LOGIN] = username226        if passcode is not None:227            headers[HDR_PASSCODE] = passcode228        self.send_frame(cmd, headers)229        if wait:230            self.transport.wait_for_connection()231            if self.transport.connection_error:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
