Best Python code snippet using tavern
decoder.py
Source:decoder.py  
1import re2import struct3from collections.abc import Mapping4from datetime import datetime, timedelta, timezone5from io import BytesIO6from .types import (7    CBORDecodeValueError, CBORDecodeEOF, CBORTag, undefined, break_marker,8    CBORSimpleValue, FrozenDict)9timestamp_re = re.compile(r'^(\d{4})-(\d\d)-(\d\d)T(\d\d):(\d\d):(\d\d)'10                          r'(?:\.(\d{1,6})\d*)?(?:Z|([+-]\d\d):(\d\d))$')11class CBORDecoder:12    """13    The CBORDecoder class implements a fully featured `CBOR`_ decoder with14    several extensions for handling shared references, big integers, rational15    numbers and so on. Typically the class is not used directly, but the16    :func:`load` and :func:`loads` functions are called to indirectly construct17    and use the class.18    When the class is constructed manually, the main entry points are19    :meth:`decode` and :meth:`decode_from_bytes`.20    :param tag_hook:21        callable that takes 2 arguments: the decoder instance, and the22        :class:`CBORTag` to be decoded. This callback is invoked for any tags23        for which there is no built-in decoder. The return value is substituted24        for the :class:`CBORTag` object in the deserialized output25    :param object_hook:26        callable that takes 2 arguments: the decoder instance, and a27        dictionary. This callback is invoked for each deserialized28        :class:`dict` object. The return value is substituted for the dict in29        the deserialized output.30    .. _CBOR: https://cbor.io/31    """32    __slots__ = (33        '_tag_hook', '_object_hook', '_share_index', '_shareables', '_fp_read',34        '_immutable', '_str_errors')35    def __init__(self, fp, tag_hook=None, object_hook=None,36                 str_errors='strict'):37        self.fp = fp38        self.tag_hook = tag_hook39        self.object_hook = object_hook40        self.str_errors = str_errors41        self._share_index = None42        self._shareables = []43        self._immutable = False44    @property45    def immutable(self):46        """47        Used by decoders to check if the calling context requires an immutable48        type.  Object_hook or tag_hook should raise an exception if this flag49        is set unless the result can be safely used as a dict key.50        """51        return self._immutable52    @property53    def fp(self):54        return self._fp_read.__self__55    @fp.setter56    def fp(self, value):57        try:58            if not callable(value.read):59                raise ValueError('fp.read is not callable')60        except AttributeError:61            raise ValueError('fp object has no read method')62        else:63            self._fp_read = value.read64    @property65    def tag_hook(self):66        return self._tag_hook67    @tag_hook.setter68    def tag_hook(self, value):69        if value is None or callable(value):70            self._tag_hook = value71        else:72            raise ValueError('tag_hook must be None or a callable')73    @property74    def object_hook(self):75        return self._object_hook76    @object_hook.setter77    def object_hook(self, value):78        if value is None or callable(value):79            self._object_hook = value80        else:81            raise ValueError('object_hook must be None or a callable')82    @property83    def str_errors(self):84        return self._str_errors85    @str_errors.setter86    def str_errors(self, value):87        if value in ('strict', 'error', 'replace'):88            self._str_errors = value89        else:90            raise ValueError(91                "invalid str_errors value {!r} (must be one of 'strict', "92                "'error', or 'replace')".format(value))93    def set_shareable(self, value):94        """95        Set the shareable value for the last encountered shared value marker,96        if any. If the current shared index is ``None``, nothing is done.97        :param value: the shared value98        :returns: the shared value to permit chaining99        """100        if self._share_index is not None:101            self._shareables[self._share_index] = value102        return value103    def read(self, amount):104        """105        Read bytes from the data stream.106        :param int amount: the number of bytes to read107        """108        data = self._fp_read(amount)109        if len(data) < amount:110            raise CBORDecodeEOF(111                'premature end of stream (expected to read {} bytes, got {} '112                'instead)'.format(amount, len(data)))113        return data114    def _decode(self, immutable=False, unshared=False):115        if immutable:116            old_immutable = self._immutable117            self._immutable = True118        if unshared:119            old_index = self._share_index120            self._share_index = None121        try:122            initial_byte = self.read(1)[0]123            major_type = initial_byte >> 5124            subtype = initial_byte & 31125            decoder = major_decoders[major_type]126            return decoder(self, subtype)127        finally:128            if immutable:129                self._immutable = old_immutable130            if unshared:131                self._share_index = old_index132    def decode(self):133        """134        Decode the next value from the stream.135        :raises CBORDecodeError: if there is any problem decoding the stream136        """137        return self._decode()138    def decode_from_bytes(self, buf):139        """140        Wrap the given bytestring as a file and call :meth:`decode` with it as141        the argument.142        This method was intended to be used from the ``tag_hook`` hook when an143        object needs to be decoded separately from the rest but while still144        taking advantage of the shared value registry.145        """146        with BytesIO(buf) as fp:147            old_fp = self.fp148            self.fp = fp149            retval = self._decode()150            self.fp = old_fp151            return retval152    def _decode_length(self, subtype, allow_indefinite=False):153        if subtype < 24:154            return subtype155        elif subtype == 24:156            return self.read(1)[0]157        elif subtype == 25:158            return struct.unpack('>H', self.read(2))[0]159        elif subtype == 26:160            return struct.unpack('>L', self.read(4))[0]161        elif subtype == 27:162            return struct.unpack('>Q', self.read(8))[0]163        elif subtype == 31 and allow_indefinite:164            return None165        else:166            raise CBORDecodeValueError(167                'unknown unsigned integer subtype 0x%x' % subtype)168    def decode_uint(self, subtype):169        # Major tag 0170        return self.set_shareable(self._decode_length(subtype))171    def decode_negint(self, subtype):172        # Major tag 1173        return self.set_shareable(-self._decode_length(subtype) - 1)174    def decode_bytestring(self, subtype):175        # Major tag 2176        length = self._decode_length(subtype, allow_indefinite=True)177        if length is None:178            # Indefinite length179            buf = []180            while True:181                initial_byte = self.read(1)[0]182                if initial_byte == 0xff:183                    result = b''.join(buf)184                    break185                elif initial_byte >> 5 == 2:186                    length = self._decode_length(initial_byte & 0x1f)187                    value = self.read(length)188                    buf.append(value)189                else:190                    raise CBORDecodeValueError(191                        "non-bytestring found in indefinite length bytestring")192        else:193            result = self.read(length)194        return self.set_shareable(result)195    def decode_string(self, subtype):196        # Major tag 3197        length = self._decode_length(subtype, allow_indefinite=True)198        if length is None:199            # Indefinite length200            # NOTE: It may seem redundant to repeat this code to handle UTF-8201            # strings but there is a reason to do this separately to202            # byte-strings. Specifically, the CBOR spec states (in sec. 2.2):203            #204            #     Text strings with indefinite lengths act the same as byte205            #     strings with indefinite lengths, except that all their chunks206            #     MUST be definite-length text strings.  Note that this implies207            #     that the bytes of a single UTF-8 character cannot be spread208            #     between chunks: a new chunk can only be started at a209            #     character boundary.210            #211            # This precludes using the indefinite bytestring decoder above as212            # that would happily ignore UTF-8 characters split across chunks.213            buf = []214            while True:215                initial_byte = self.read(1)[0]216                if initial_byte == 0xff:217                    result = ''.join(buf)218                    break219                elif initial_byte >> 5 == 3:220                    length = self._decode_length(initial_byte & 0x1f)221                    value = self.read(length).decode('utf-8', self._str_errors)222                    buf.append(value)223                else:224                    raise CBORDecodeValueError(225                        "non-string found in indefinite length string")226        else:227            result = self.read(length).decode('utf-8', self._str_errors)228        return self.set_shareable(result)229    def decode_array(self, subtype):230        # Major tag 4231        length = self._decode_length(subtype, allow_indefinite=True)232        if length is None:233            # Indefinite length234            items = []235            if not self._immutable:236                self.set_shareable(items)237            while True:238                value = self._decode()239                if value is break_marker:240                    break241                else:242                    items.append(value)243        else:244            items = []245            if not self._immutable:246                self.set_shareable(items)247            for index in range(length):248                items.append(self._decode())249        if self._immutable:250            items = tuple(items)251            self.set_shareable(items)252        return items253    def decode_map(self, subtype):254        # Major tag 5255        length = self._decode_length(subtype, allow_indefinite=True)256        if length is None:257            # Indefinite length258            dictionary = {}259            self.set_shareable(dictionary)260            while True:261                key = self._decode(immutable=True, unshared=True)262                if key is break_marker:263                    break264                else:265                    dictionary[key] = self._decode(unshared=True)266        else:267            dictionary = {}268            self.set_shareable(dictionary)269            for _ in range(length):270                key = self._decode(immutable=True, unshared=True)271                dictionary[key] = self._decode(unshared=True)272        if self._object_hook:273            dictionary = self._object_hook(self, dictionary)274            self.set_shareable(dictionary)275        elif self._immutable:276            dictionary = FrozenDict(dictionary)277            self.set_shareable(dictionary)278        return dictionary279    def decode_semantic(self, subtype):280        # Major tag 6281        tagnum = self._decode_length(subtype)282        semantic_decoder = semantic_decoders.get(tagnum)283        if semantic_decoder:284            return semantic_decoder(self)285        else:286            tag = CBORTag(tagnum, None)287            self.set_shareable(tag)288            tag.value = self._decode(unshared=True)289            if self._tag_hook:290                tag = self._tag_hook(self, tag)291            return self.set_shareable(tag)292    def decode_special(self, subtype):293        # Simple value294        if subtype < 20:295            # XXX Set shareable?296            return CBORSimpleValue(subtype)297        # Major tag 7298        return special_decoders[subtype](self)299    #300    # Semantic decoders (major tag 6)301    #302    def decode_datetime_string(self):303        # Semantic tag 0304        value = self._decode()305        match = timestamp_re.match(value)306        if match:307            (308                year,309                month,310                day,311                hour,312                minute,313                second,314                secfrac,315                offset_h,316                offset_m,317            ) = match.groups()318            if secfrac is None:319                microsecond = 0320            else:321                microsecond = int('{:<06}'.format(secfrac))322            if offset_h:323                tz = timezone(timedelta(hours=int(offset_h), minutes=int(offset_m)))324            else:325                tz = timezone.utc326            return self.set_shareable(datetime(327                int(year), int(month), int(day),328                int(hour), int(minute), int(second), microsecond, tz))329        else:330            raise CBORDecodeValueError(331                'invalid datetime string: {!r}'.format(value))332    def decode_epoch_datetime(self):333        # Semantic tag 1334        value = self._decode()335        return self.set_shareable(datetime.fromtimestamp(value, timezone.utc))336    def decode_positive_bignum(self):337        # Semantic tag 2338        from binascii import hexlify339        value = self._decode()340        return self.set_shareable(int(hexlify(value), 16))341    def decode_negative_bignum(self):342        # Semantic tag 3343        return self.set_shareable(-self.decode_positive_bignum() - 1)344    def decode_fraction(self):345        # Semantic tag 4346        from decimal import Decimal347        exp, sig = self._decode()348        return self.set_shareable(Decimal(sig) * (10 ** Decimal(exp)))349    def decode_bigfloat(self):350        # Semantic tag 5351        from decimal import Decimal352        exp, sig = self._decode()353        return self.set_shareable(Decimal(sig) * (2 ** Decimal(exp)))354    def decode_shareable(self):355        # Semantic tag 28356        old_index = self._share_index357        self._share_index = len(self._shareables)358        self._shareables.append(None)359        try:360            return self._decode()361        finally:362            self._share_index = old_index363    def decode_sharedref(self):364        # Semantic tag 29365        value = self._decode(unshared=True)366        try:367            shared = self._shareables[value]368        except IndexError:369            raise CBORDecodeValueError('shared reference %d not found' % value)370        if shared is None:371            raise CBORDecodeValueError(372                'shared value %d has not been initialized' % value)373        else:374            return shared375    def decode_rational(self):376        # Semantic tag 30377        from fractions import Fraction378        return self.set_shareable(Fraction(*self._decode()))379    def decode_regexp(self):380        # Semantic tag 35381        return self.set_shareable(re.compile(self._decode()))382    def decode_mime(self):383        # Semantic tag 36384        from email.parser import Parser385        return self.set_shareable(Parser().parsestr(self._decode()))386    def decode_uuid(self):387        # Semantic tag 37388        from uuid import UUID389        return self.set_shareable(UUID(bytes=self._decode()))390    def decode_set(self):391        # Semantic tag 258392        if self._immutable:393            return self.set_shareable(frozenset(self._decode(immutable=True)))394        else:395            return self.set_shareable(set(self._decode(immutable=True)))396    def decode_ipaddress(self):397        # Semantic tag 260398        from ipaddress import ip_address399        buf = self.decode()400        if not isinstance(buf, bytes) or len(buf) not in (4, 6, 16):401            raise CBORDecodeValueError("invalid ipaddress value %r" % buf)402        elif len(buf) in (4, 16):403            return self.set_shareable(ip_address(buf))404        elif len(buf) == 6:405            # MAC address406            return self.set_shareable(CBORTag(260, buf))407    def decode_ipnetwork(self):408        # Semantic tag 261409        from ipaddress import ip_network410        net_map = self.decode()411        if isinstance(net_map, Mapping) and len(net_map) == 1:412            for net in net_map.items():413                try:414                    return self.set_shareable(ip_network(net, strict=False))415                except (TypeError, ValueError):416                    break417        raise CBORDecodeValueError("invalid ipnetwork value %r" % net_map)418    def decode_self_describe_cbor(self):419        # Semantic tag 55799420        return self._decode()421    #422    # Special decoders (major tag 7)423    #424    def decode_simple_value(self):425        # XXX Set shareable?426        return CBORSimpleValue(self.read(1)[0])427    def decode_float16(self):428        payload = self.read(2)429        value = struct.unpack('>e', payload)[0]430        return self.set_shareable(value)431    def decode_float32(self):432        return self.set_shareable(struct.unpack('>f', self.read(4))[0])433    def decode_float64(self):434        return self.set_shareable(struct.unpack('>d', self.read(8))[0])435major_decoders = {436    0: CBORDecoder.decode_uint,437    1: CBORDecoder.decode_negint,438    2: CBORDecoder.decode_bytestring,439    3: CBORDecoder.decode_string,440    4: CBORDecoder.decode_array,441    5: CBORDecoder.decode_map,442    6: CBORDecoder.decode_semantic,443    7: CBORDecoder.decode_special,444}445special_decoders = {446    20: lambda self: False,447    21: lambda self: True,448    22: lambda self: None,449    23: lambda self: undefined,450    24: CBORDecoder.decode_simple_value,451    25: CBORDecoder.decode_float16,452    26: CBORDecoder.decode_float32,453    27: CBORDecoder.decode_float64,454    31: lambda self: break_marker,455}456semantic_decoders = {457    0:     CBORDecoder.decode_datetime_string,458    1:     CBORDecoder.decode_epoch_datetime,459    2:     CBORDecoder.decode_positive_bignum,460    3:     CBORDecoder.decode_negative_bignum,461    4:     CBORDecoder.decode_fraction,462    5:     CBORDecoder.decode_bigfloat,463    28:    CBORDecoder.decode_shareable,464    29:    CBORDecoder.decode_sharedref,465    30:    CBORDecoder.decode_rational,466    35:    CBORDecoder.decode_regexp,467    36:    CBORDecoder.decode_mime,468    37:    CBORDecoder.decode_uuid,469    258:   CBORDecoder.decode_set,470    260:   CBORDecoder.decode_ipaddress,471    261:   CBORDecoder.decode_ipnetwork,472    55799: CBORDecoder.decode_self_describe_cbor,473}474def loads(s, **kwargs):475    """476    Deserialize an object from a bytestring.477    :param bytes s:478        the bytestring to deserialize479    :param kwargs:480        keyword arguments passed to :class:`CBORDecoder`481    :return:482        the deserialized object483    """484    with BytesIO(s) as fp:485        return CBORDecoder(fp, **kwargs).decode()486def load(fp, **kwargs):487    """488    Deserialize an object from an open file.489    :param fp:490        the input file (any file-like object)491    :param kwargs:492        keyword arguments passed to :class:`CBORDecoder`493    :return:494        the deserialized object495    """...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
