Best Python code snippet using localstack_python
rados.py
Source:rados.py  
...129    def run(self):130        self.retval = self.target(*self.args)131# time in seconds between each call to t.join() for child thread132POLL_TIME_INCR = 0.5133def run_in_thread(target, args, timeout=0):134    interrupt = False135    countdown = timeout136    t = RadosThread(target, args)137    # allow the main thread to exit (presumably, avoid a join() on this138    # subthread) before this thread terminates.  This allows SIGINT139    # exit of a blocked call.  See below.140    t.daemon = True141    t.start()142    try:143        # poll for thread exit144        while t.is_alive():145            t.join(POLL_TIME_INCR)146            if timeout and t.is_alive():147                countdown = countdown - POLL_TIME_INCR148                if countdown <= 0:149                    raise KeyboardInterrupt150        t.join()        # in case t exits before reaching the join() above151    except KeyboardInterrupt:152        # ..but allow SIGINT to terminate the waiting.  Note: this153        # relies on the Linux kernel behavior of delivering the signal154        # to the main thread in preference to any subthread (all that's155        # strictly guaranteed is that *some* thread that has the signal156        # unblocked will receive it).  But there doesn't seem to be157        # any interface to create t with SIGINT blocked.158        interrupt = True159    if interrupt:160        t.retval = -errno.EINTR161    return t.retval162# helper to specify an optional argument, where in addition to `cls`, `None`163# is also acceptable164def opt(cls):165    return (cls, None)166# validate argument types of an instance method167# kwargs is an un-ordered dict, so use args instead168def requires(*types):169    def is_type_of(v, t):170        if t is None:171            return v is None172        else:173            return isinstance(v, t)174    def check_type(val, arg_name, arg_type):175        if isinstance(arg_type, tuple):176            if any(is_type_of(val, t) for t in arg_type):177                return178            type_names = ' or '.join('None' if t is None else t.__name__179                                     for t in arg_type)180            raise TypeError('%s must be %s' % (arg_name, type_names))181        else:182            if is_type_of(val, arg_type):183                return184            assert(arg_type is not None)185            raise TypeError('%s must be %s' % (arg_name, arg_type.__name__))186    def wrapper(f):187        @wraps(f)188        def validate_func(*args, **kwargs):189            # ignore the `self` arg190            pos_args = zip(args[1:], types)191            named_args = ((kwargs[name], (name, spec)) for name, spec in types192                          if name in kwargs)193            for arg_val, (arg_name, arg_type) in chain(pos_args, named_args):194                check_type(arg_val, arg_name, arg_type)195            return f(*args, **kwargs)196        return validate_func197    return wrapper198class Rados(object):199    """librados python wrapper"""200    def require_state(self, *args):201        """202        Checks if the Rados object is in a special state203        :raises: RadosStateError204        """205        if self.state in args:206            return207        raise RadosStateError("You cannot perform that operation on a \208Rados object in state %s." % self.state)209    @requires(('rados_id', opt(str)), ('name', opt(str)), ('clustername', opt(str)),210              ('conffile', opt(str)))211    def __init__(self, rados_id=None, name=None, clustername=None,212                 conf_defaults=None, conffile=None, conf=None, flags=0):213        library_path = find_library('rados')214        # maybe find_library can not find it correctly on all platforms,215        # so fall back to librados.so.2 in such case.216        self.librados = CDLL(library_path if library_path is not None else 'librados.so.2')217        self.parsed_args = []218        self.conf_defaults = conf_defaults219        self.conffile = conffile220        self.cluster = c_void_p()221        self.rados_id = rados_id222        if rados_id and name:223            raise Error("Rados(): can't supply both rados_id and name")224        elif rados_id:225            name = 'client.' + rados_id226        elif name is None:227            name = 'client.admin'228        if clustername is None:229            clustername = 'ceph'230        ret = run_in_thread(self.librados.rados_create2,231                            (byref(self.cluster), c_char_p(clustername),232                             c_char_p(name), c_uint64(flags)))233        if ret != 0:234            raise Error("rados_initialize failed with error code: %d" % ret)235        self.state = "configuring"236        # order is important: conf_defaults, then conffile, then conf237        if conf_defaults:238            for key, value in conf_defaults.iteritems():239                self.conf_set(key, value)240        if conffile is not None:241            # read the default conf file when '' is given242            if conffile == '':243                conffile = None244            self.conf_read_file(conffile)245        if conf:246            for key, value in conf.iteritems():247                self.conf_set(key, value)248    def shutdown(self):249        """250        Disconnects from the cluster.  Call this explicitly when a251        Rados.connect()ed object is no longer used.252        """253        if hasattr(self, "state") and self.state != "shutdown":254            run_in_thread(self.librados.rados_shutdown, (self.cluster,))255            self.state = "shutdown"256    def __enter__(self):257        self.connect()258        return self259    def __exit__(self, type_, value, traceback):260        self.shutdown()261        return False262    def version(self):263        """264        Get the version number of the ``librados`` C library.265        :returns: a tuple of ``(major, minor, extra)`` components of the266                  librados version267        """268        major = c_int(0)269        minor = c_int(0)270        extra = c_int(0)271        run_in_thread(self.librados.rados_version,272                      (byref(major), byref(minor), byref(extra)))273        return Version(major.value, minor.value, extra.value)274    @requires(('path', opt(str)))275    def conf_read_file(self, path=None):276        """277        Configure the cluster handle using a Ceph config file.278        :param path: path to the config file279        :type path: str280        """281        self.require_state("configuring", "connected")282        ret = run_in_thread(self.librados.rados_conf_read_file,283                            (self.cluster, c_char_p(path)))284        if (ret != 0):285            raise make_ex(ret, "error calling conf_read_file")286    def conf_parse_argv(self, args):287        """288        Parse known arguments from args, and remove; returned289        args contain only those unknown to ceph290        """291        self.require_state("configuring", "connected")292        if not args:293            return294        # create instances of arrays of c_char_p's, both len(args) long295        # cretargs will always be a subset of cargs (perhaps identical)296        cargs = (c_char_p * len(args))(*args)297        cretargs = (c_char_p * len(args))()298        ret = run_in_thread(self.librados.rados_conf_parse_argv_remainder,299                            (self.cluster, len(args), cargs, cretargs))300        if ret:301            raise make_ex(ret, "error calling conf_parse_argv_remainder")302        # cretargs was allocated with fixed length; collapse return303        # list to eliminate any missing args304        retargs = [a for a in cretargs if a is not None]305        self.parsed_args = args306        return retargs307    def conf_parse_env(self, var='CEPH_ARGS'):308        """309        Parse known arguments from an environment variable, normally310        CEPH_ARGS.311        """312        self.require_state("configuring", "connected")313        if not var:314            return315        ret = run_in_thread(self.librados.rados_conf_parse_env,316                            (self.cluster, c_char_p(var)))317        if (ret != 0):318            raise make_ex(ret, "error calling conf_parse_env")319    @requires(('option', str))320    def conf_get(self, option):321        """322        Get the value of a configuration option323        :param option: which option to read324        :type option: str325        :returns: str - value of the option or None326        :raises: :class:`TypeError`327        """328        self.require_state("configuring", "connected")329        length = 20330        while True:331            ret_buf = create_string_buffer(length)332            ret = run_in_thread(self.librados.rados_conf_get,333                                (self.cluster, c_char_p(option), ret_buf,334                                 c_size_t(length)))335            if (ret == 0):336                return ret_buf.value337            elif (ret == -errno.ENAMETOOLONG):338                length = length * 2339            elif (ret == -errno.ENOENT):340                return None341            else:342                raise make_ex(ret, "error calling conf_get")343    @requires(('option', str), ('val', str))344    def conf_set(self, option, val):345        """346        Set the value of a configuration option347        :param option: which option to set348        :type option: str349        :param option: value of the option350        :type option: str351        :raises: :class:`TypeError`, :class:`ObjectNotFound`352        """353        self.require_state("configuring", "connected")354        ret = run_in_thread(self.librados.rados_conf_set,355                            (self.cluster, c_char_p(option), c_char_p(val)))356        if (ret != 0):357            raise make_ex(ret, "error calling conf_set")358    def ping_monitor(self, mon_id):359        """360        Ping a monitor to assess liveness361        May be used as a simply way to assess liveness, or to obtain362        information about the monitor in a simple way even in the363        absence of quorum.364        :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)365        :type mon_id: str366        :returns: the string reply from the monitor367        """368        self.require_state("configuring", "connected")369        outstrp = pointer(pointer(c_char()))370        outstrlen = c_long()371        ret = run_in_thread(self.librados.rados_ping_monitor,372                            (self.cluster, c_char_p(mon_id),373                             outstrp, byref(outstrlen)))374        my_outstr = outstrp.contents[:(outstrlen.value)]375        if outstrlen.value:376            run_in_thread(self.librados.rados_buffer_free, (outstrp.contents,))377        if ret != 0:378            raise make_ex(ret, "error calling ping_monitor")379        return my_outstr380    def connect(self, timeout=0):381        """382        Connect to the cluster.  Use shutdown() to release resources.383        """384        self.require_state("configuring")385        ret = run_in_thread(self.librados.rados_connect, (self.cluster,),386                            timeout)387        if (ret != 0):388            raise make_ex(ret, "error connecting to the cluster")389        self.state = "connected"390    def get_cluster_stats(self):391        """392        Read usage info about the cluster393        This tells you total space, space used, space available, and number394        of objects. These are not updated immediately when data is written,395        they are eventually consistent.396        :returns: dict - contains the following keys:397            - ``kb`` (int) - total space398            - ``kb_used`` (int) - space used399            - ``kb_avail`` (int) - free space available400            - ``num_objects`` (int) - number of objects401        """402        stats = rados_cluster_stat_t()403        ret = run_in_thread(self.librados.rados_cluster_stat,404                            (self.cluster, byref(stats)))405        if ret < 0:406            raise make_ex(407                ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)408        return {'kb': stats.kb,409                'kb_used': stats.kb_used,410                'kb_avail': stats.kb_avail,411                'num_objects': stats.num_objects}412    @requires(('pool_name', str))413    def pool_exists(self, pool_name):414        """415        Checks if a given pool exists.416        :param pool_name: name of the pool to check417        :type pool_name: str418        :raises: :class:`TypeError`, :class:`Error`419        :returns: bool - whether the pool exists, false otherwise.420        """421        self.require_state("connected")422        ret = run_in_thread(self.librados.rados_pool_lookup,423                            (self.cluster, c_char_p(pool_name)))424        if (ret >= 0):425            return True426        elif (ret == -errno.ENOENT):427            return False428        else:429            raise make_ex(ret, "error looking up pool '%s'" % pool_name)430    @requires(('pool_name', str))431    def pool_lookup(self, pool_name):432        """433        Returns a pool's ID based on its name.434        :param pool_name: name of the pool to look up435        :type pool_name: str436        :raises: :class:`TypeError`, :class:`Error`437        :returns: int - pool ID, or None if it doesn't exist438        """439        self.require_state("connected")440        ret = run_in_thread(self.librados.rados_pool_lookup,441                            (self.cluster, c_char_p(pool_name)))442        if (ret >= 0):443            return int(ret)444        elif (ret == -errno.ENOENT):445            return None446        else:447            raise make_ex(ret, "error looking up pool '%s'" % pool_name)448    @requires(('pool_id', int))449    def pool_reverse_lookup(self, pool_id):450        """451        Returns a pool's name based on its ID.452        :param pool_id: ID of the pool to look up453        :type pool_id: int454        :raises: :class:`TypeError`, :class:`Error`455        :returns: string - pool name, or None if it doesn't exist456        """457        self.require_state("connected")458        size = c_size_t(512)459        while True:460            c_name = create_string_buffer(size.value)461            ret = run_in_thread(self.librados.rados_pool_reverse_lookup,462                                (self.cluster, c_int64(pool_id), byref(c_name), size))463            if ret > size.value:464                size = c_size_t(ret)465            elif ret == -errno.ENOENT:466                return None467            elif ret < 0:468                raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)469            else:470                return c_name.value471                break472    @requires(('pool_name', str), ('auid', opt(int)), ('crush_rule', opt(int)))473    def create_pool(self, pool_name, auid=None, crush_rule=None):474        """475        Create a pool:476        - with default settings: if auid=None and crush_rule=None477        - owned by a specific auid: auid given and crush_rule=None478        - with a specific CRUSH rule: if auid=None and crush_rule given479        - with a specific CRUSH rule and auid: if auid and crush_rule given480        :param pool_name: name of the pool to create481        :type pool_name: str482        :param auid: the id of the owner of the new pool483        :type auid: int484        :param crush_rule: rule to use for placement in the new pool485        :type crush_rule: int486        :raises: :class:`TypeError`, :class:`Error`487        """488        self.require_state("connected")489        if auid is None:490            if crush_rule is None:491                ret = run_in_thread(self.librados.rados_pool_create,492                                    (self.cluster, c_char_p(pool_name)))493            else:494                ret = run_in_thread(self.librados.495                                    rados_pool_create_with_crush_rule,496                                    (self.cluster, c_char_p(pool_name),497                                     c_ubyte(crush_rule)))498        elif crush_rule is None:499            ret = run_in_thread(self.librados.rados_pool_create_with_auid,500                                (self.cluster, c_char_p(pool_name),501                                 c_uint64(auid)))502        else:503            ret = run_in_thread(self.librados.rados_pool_create_with_all,504                                (self.cluster, c_char_p(pool_name),505                                 c_uint64(auid), c_ubyte(crush_rule)))506        if ret < 0:507            raise make_ex(ret, "error creating pool '%s'" % pool_name)508    @requires(('pool_id', int))509    def get_pool_base_tier(self, pool_id):510        """511        Get base pool512        :returns: base pool, or pool_id if tiering is not configured for the pool513        """514        self.require_state("connected")515        base_tier = c_int64(0)516        ret = run_in_thread(self.librados.rados_pool_get_base_tier,517                            (self.cluster, c_int64(pool_id), byref(base_tier)))518        if ret < 0:519            raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)520        return base_tier.value521    @requires(('pool_name', str))522    def delete_pool(self, pool_name):523        """524        Delete a pool and all data inside it.525        The pool is removed from the cluster immediately,526        but the actual data is deleted in the background.527        :param pool_name: name of the pool to delete528        :type pool_name: str529        :raises: :class:`TypeError`, :class:`Error`530        """531        self.require_state("connected")532        ret = run_in_thread(self.librados.rados_pool_delete,533                            (self.cluster, c_char_p(pool_name)))534        if ret < 0:535            raise make_ex(ret, "error deleting pool '%s'" % pool_name)536    def list_pools(self):537        """538        Gets a list of pool names.539        :returns: list - of pool names.540        """541        self.require_state("connected")542        size = c_size_t(512)543        while True:544            c_names = create_string_buffer(size.value)545            ret = run_in_thread(self.librados.rados_pool_list,546                                (self.cluster, byref(c_names), size))547            if ret > size.value:548                size = c_size_t(ret)549            else:550                break551        return filter(lambda name: name != '', c_names.raw.split('\0'))552    def get_fsid(self):553        """554        Get the fsid of the cluster as a hexadecimal string.555        :raises: :class:`Error`556        :returns: str - cluster fsid557        """558        self.require_state("connected")559        buf_len = 37560        fsid = create_string_buffer(buf_len)561        ret = run_in_thread(self.librados.rados_cluster_fsid,562                            (self.cluster, byref(fsid), c_size_t(buf_len)))563        if ret < 0:564            raise make_ex(ret, "error getting cluster fsid")565        return fsid.value566    @requires(('ioctx_name', str))567    def open_ioctx(self, ioctx_name):568        """569        Create an io context570        The io context allows you to perform operations within a particular571        pool.572        :param ioctx_name: name of the pool573        :type ioctx_name: str574        :raises: :class:`TypeError`, :class:`Error`575        :returns: Ioctx - Rados Ioctx object576        """577        self.require_state("connected")578        ioctx = c_void_p()579        ret = run_in_thread(self.librados.rados_ioctx_create,580                            (self.cluster, c_char_p(ioctx_name), byref(ioctx)))581        if ret < 0:582            raise make_ex(ret, "error opening pool '%s'" % ioctx_name)583        return Ioctx(ioctx_name, self.librados, ioctx)584    def mon_command(self, cmd, inbuf, timeout=0, target=None):585        """586        mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)587        returns (int ret, string outbuf, string outs)588        """589        self.require_state("connected")590        outbufp = pointer(pointer(c_char()))591        outbuflen = c_long()592        outsp = pointer(pointer(c_char()))593        outslen = c_long()594        cmdarr = (c_char_p * len(cmd))(*cmd)595        if target:596            ret = run_in_thread(self.librados.rados_mon_command_target,597                                (self.cluster, c_char_p(target), cmdarr,598                                 len(cmd), c_char_p(inbuf), len(inbuf),599                                 outbufp, byref(outbuflen), outsp,600                                 byref(outslen)), timeout)601        else:602            ret = run_in_thread(self.librados.rados_mon_command,603                                (self.cluster, cmdarr, len(cmd),604                                 c_char_p(inbuf), len(inbuf),605                                 outbufp, byref(outbuflen), outsp, byref(outslen)),606                                timeout)607        # copy returned memory (ctypes makes a copy, not a reference)608        my_outbuf = outbufp.contents[:(outbuflen.value)]609        my_outs = outsp.contents[:(outslen.value)]610        # free callee's allocations611        if outbuflen.value:612            run_in_thread(self.librados.rados_buffer_free, (outbufp.contents,))613        if outslen.value:614            run_in_thread(self.librados.rados_buffer_free, (outsp.contents,))615        return (ret, my_outbuf, my_outs)616    def osd_command(self, osdid, cmd, inbuf, timeout=0):617        """618        osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)619        returns (int ret, string outbuf, string outs)620        """621        self.require_state("connected")622        outbufp = pointer(pointer(c_char()))623        outbuflen = c_long()624        outsp = pointer(pointer(c_char()))625        outslen = c_long()626        cmdarr = (c_char_p * len(cmd))(*cmd)627        ret = run_in_thread(self.librados.rados_osd_command,628                            (self.cluster, osdid, cmdarr, len(cmd),629                             c_char_p(inbuf), len(inbuf),630                             outbufp, byref(outbuflen), outsp, byref(outslen)),631                            timeout)632        # copy returned memory (ctypes makes a copy, not a reference)633        my_outbuf = outbufp.contents[:(outbuflen.value)]634        my_outs = outsp.contents[:(outslen.value)]635        # free callee's allocations636        if outbuflen.value:637            run_in_thread(self.librados.rados_buffer_free, (outbufp.contents,))638        if outslen.value:639            run_in_thread(self.librados.rados_buffer_free, (outsp.contents,))640        return (ret, my_outbuf, my_outs)641    def pg_command(self, pgid, cmd, inbuf, timeout=0):642        """643        pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)644        returns (int ret, string outbuf, string outs)645        """646        self.require_state("connected")647        outbufp = pointer(pointer(c_char()))648        outbuflen = c_long()649        outsp = pointer(pointer(c_char()))650        outslen = c_long()651        cmdarr = (c_char_p * len(cmd))(*cmd)652        ret = run_in_thread(self.librados.rados_pg_command,653                            (self.cluster, c_char_p(pgid), cmdarr, len(cmd),654                             c_char_p(inbuf), len(inbuf),655                             outbufp, byref(outbuflen), outsp, byref(outslen)),656                            timeout)657        # copy returned memory (ctypes makes a copy, not a reference)658        my_outbuf = outbufp.contents[:(outbuflen.value)]659        my_outs = outsp.contents[:(outslen.value)]660        # free callee's allocations661        if outbuflen.value:662            run_in_thread(self.librados.rados_buffer_free, (outbufp.contents,))663        if outslen.value:664            run_in_thread(self.librados.rados_buffer_free, (outsp.contents,))665        return (ret, my_outbuf, my_outs)666    def wait_for_latest_osdmap(self):667        self.require_state("connected")668        return run_in_thread(self.librados.rados_wait_for_latest_osdmap, (self.cluster,))669    def blacklist_add(self, client_address, expire_seconds=0):670        """671        Blacklist a client from the OSDs672        :param client_address: client address673        :type client_address: str674        :param expire_seconds: number of seconds to blacklist675        :type expire_seconds: int676        :raises: :class:`Error`677        """678        self.require_state("connected")679        ret = run_in_thread(self.librados.rados_blacklist_add,680                            (self.cluster, c_char_p(client_address),681                             c_uint32(expire_seconds)))682        if ret < 0:683            raise make_ex(ret, "error blacklisting client '%s'" % client_address)684class OmapIterator(object):685    """Omap iterator"""686    def __init__(self, ioctx, ctx):687        self.ioctx = ioctx688        self.ctx = ctx689    def __iter__(self):690        return self691    def next(self):692        """693        Get the next key-value pair in the object694        :returns: next rados.OmapItem695        """696        key_ = c_char_p(0)697        val_ = c_char_p(0)698        len_ = c_int(0)699        ret = run_in_thread(self.ioctx.librados.rados_omap_get_next,700                      (self.ctx, byref(key_), byref(val_), byref(len_)))701        if (ret != 0):702            raise make_ex(ret, "error iterating over the omap")703        if key_.value is None:704            raise StopIteration()705        key = ctypes.string_at(key_)706        val = None707        if val_.value is not None:708            val = ctypes.string_at(val_, len_)709        return (key, val)710    def __del__(self):711        run_in_thread(self.ioctx.librados.rados_omap_get_end, (self.ctx,))712class ObjectIterator(object):713    """rados.Ioctx Object iterator"""714    def __init__(self, ioctx):715        self.ioctx = ioctx716        self.ctx = c_void_p()717        ret = run_in_thread(self.ioctx.librados.rados_nobjects_list_open,718                            (self.ioctx.io, byref(self.ctx)))719        if ret < 0:720            raise make_ex(ret, "error iterating over the objects in ioctx '%s'"721                          % self.ioctx.name)722    def __iter__(self):723        return self724    def next(self):725        """726        Get the next object name and locator in the pool727        :raises: StopIteration728        :returns: next rados.Ioctx Object729        """730        key = c_char_p()731        locator = c_char_p()732        nspace = c_char_p()733        ret = run_in_thread(self.ioctx.librados.rados_nobjects_list_next,734                            (self.ctx, byref(key), byref(locator), byref(nspace)))735        if ret < 0:736            raise StopIteration()737        return Object(self.ioctx, key.value, locator.value, nspace.value)738    def __del__(self):739        run_in_thread(self.ioctx.librados.rados_nobjects_list_close, (self.ctx,))740class XattrIterator(object):741    """Extended attribute iterator"""742    def __init__(self, ioctx, it, oid):743        self.ioctx = ioctx744        self.it = it745        self.oid = oid746    def __iter__(self):747        return self748    def next(self):749        """750        Get the next xattr on the object751        :raises: StopIteration752        :returns: pair - of name and value of the next Xattr753        """754        name_ = c_char_p(0)755        val_ = c_char_p(0)756        len_ = c_int(0)757        ret = run_in_thread(self.ioctx.librados.rados_getxattrs_next,758                            (self.it, byref(name_), byref(val_), byref(len_)))759        if (ret != 0):760            raise make_ex(ret, "error iterating over the extended attributes \761in '%s'" % self.oid)762        if name_.value is None:763            raise StopIteration()764        name = ctypes.string_at(name_)765        val = ctypes.string_at(val_, len_)766        return (name, val)767    def __del__(self):768        run_in_thread(self.ioctx.librados.rados_getxattrs_end, (self.it,))769class SnapIterator(object):770    """Snapshot iterator"""771    def __init__(self, ioctx):772        self.ioctx = ioctx773        # We don't know how big a buffer we need until we've called the774        # function. So use the exponential doubling strategy.775        num_snaps = 10776        while True:777            self.snaps = (ctypes.c_uint64 * num_snaps)()778            ret = run_in_thread(self.ioctx.librados.rados_ioctx_snap_list,779                                (self.ioctx.io, self.snaps, c_int(num_snaps)))780            if (ret >= 0):781                self.max_snap = ret782                break783            elif (ret != -errno.ERANGE):784                raise make_ex(ret, "error calling rados_snap_list for \785ioctx '%s'" % self.ioctx.name)786            num_snaps = num_snaps * 2787        self.cur_snap = 0788    def __iter__(self):789        return self790    def next(self):791        """792        Get the next Snapshot793        :raises: :class:`Error`, StopIteration794        :returns: Snap - next snapshot795        """796        if (self.cur_snap >= self.max_snap):797            raise StopIteration798        snap_id = self.snaps[self.cur_snap]799        name_len = 10800        while True:801            name = create_string_buffer(name_len)802            ret = run_in_thread(self.ioctx.librados.rados_ioctx_snap_get_name,803                                (self.ioctx.io, c_uint64(snap_id), byref(name),804                                 c_int(name_len)))805            if (ret == 0):806                name_len = ret807                break808            elif (ret != -errno.ERANGE):809                raise make_ex(ret, "rados_snap_get_name error")810            name_len = name_len * 2811        snap = Snap(self.ioctx, name.value, snap_id)812        self.cur_snap = self.cur_snap + 1813        return snap814class Snap(object):815    """Snapshot object"""816    def __init__(self, ioctx, name, snap_id):817        self.ioctx = ioctx818        self.name = name819        self.snap_id = snap_id820    def __str__(self):821        return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \822            % (str(self.ioctx), self.name, self.snap_id)823    def get_timestamp(self):824        """825        Find when a snapshot in the current pool occurred826        :raises: :class:`Error`827        :returns: datetime - the data and time the snapshot was created828        """829        snap_time = c_long(0)830        ret = run_in_thread(self.ioctx.librados.rados_ioctx_snap_get_stamp,831                            (self.ioctx.io, self.snap_id, byref(snap_time)))832        if (ret != 0):833            raise make_ex(ret, "rados_ioctx_snap_get_stamp error")834        return datetime.fromtimestamp(snap_time.value)835class Completion(object):836    """completion object"""837    def __init__(self, ioctx, rados_comp, oncomplete, onsafe,838                 complete_cb, safe_cb):839        self.rados_comp = rados_comp840        self.oncomplete = oncomplete841        self.onsafe = onsafe842        self.ioctx = ioctx843        self.complete_cb = complete_cb844        self.safe_cb = safe_cb845    def is_safe(self):846        """847        Is an asynchronous operation safe?848        This does not imply that the safe callback has finished.849        :returns: True if the operation is safe850        """851        return run_in_thread(self.ioctx.librados.rados_aio_is_safe,852                             (self.rados_comp,)) == 1853    def is_complete(self):854        """855        Has an asynchronous operation completed?856        This does not imply that the safe callback has finished.857        :returns: True if the operation is completed858        """859        return run_in_thread(self.ioctx.librados.rados_aio_is_complete,860                             (self.rados_comp,)) == 1861    def wait_for_safe(self):862        """863        Wait for an asynchronous operation to be marked safe864        This does not imply that the safe callback has finished.865        """866        run_in_thread(self.ioctx.librados.rados_aio_wait_for_safe,867                      (self.rados_comp,))868    def wait_for_complete(self):869        """870        Wait for an asynchronous operation to complete871        This does not imply that the complete callback has finished.872        """873        run_in_thread(self.ioctx.librados.rados_aio_wait_for_complete,874                      (self.rados_comp,))875    def wait_for_safe_and_cb(self):876        """877        Wait for an asynchronous operation to be marked safe and for878        the safe callback to have returned879        """880        run_in_thread(self.ioctx.librados.rados_aio_wait_for_safe_and_cb,881                      (self.rados_comp,))882    def wait_for_complete_and_cb(self):883        """884        Wait for an asynchronous operation to complete and for the885        complete callback to have returned886        :returns:  whether the operation is completed887        """888        return run_in_thread(889            self.ioctx.librados.rados_aio_wait_for_complete_and_cb,890            (self.rados_comp,)891        )892    def get_return_value(self):893        """894        Get the return value of an asychronous operation895        The return value is set when the operation is complete or safe,896        whichever comes first.897        :returns: int - return value of the operation898        """899        return run_in_thread(self.ioctx.librados.rados_aio_get_return_value,900                             (self.rados_comp,))901    def __del__(self):902        """903        Release a completion904        Call this when you no longer need the completion. It may not be905        freed immediately if the operation is not acked and committed.906        """907        run_in_thread(self.ioctx.librados.rados_aio_release,908                      (self.rados_comp,))909class WriteOpCtx(object):910    """write operation context manager"""911    def __init__(self, ioctx):912        self.ioctx = ioctx913    def __enter__(self):914        self.ioctx.librados.rados_create_write_op.restype = c_void_p915        ret = run_in_thread(self.ioctx.librados.rados_create_write_op, (None,))916        self.write_op = ret917        return ret918    def __exit__(self, type, msg, traceback):919        self.ioctx.librados.rados_release_write_op.argtypes = [c_void_p]920        run_in_thread(self.ioctx.librados.rados_release_write_op, (c_void_p(self.write_op),))921class ReadOpCtx(object):922    """read operation context manager"""923    def __init__(self, ioctx):924        self.ioctx = ioctx925    def __enter__(self):926        self.ioctx.librados.rados_create_read_op.restype = c_void_p927        ret = run_in_thread(self.ioctx.librados.rados_create_read_op, (None,))928        self.read_op = ret929        return ret930    def __exit__(self, type, msg, traceback):931        self.ioctx.librados.rados_release_read_op.argtypes = [c_void_p]932        run_in_thread(self.ioctx.librados.rados_release_read_op, (c_void_p(self.read_op),))933RADOS_CB = CFUNCTYPE(c_int, c_void_p, c_void_p)934class Ioctx(object):935    """rados.Ioctx object"""936    def __init__(self, name, librados, io):937        self.name = name938        self.librados = librados939        self.io = io940        self.state = "open"941        self.locator_key = ""942        self.nspace = ""943        self.safe_cbs = {}944        self.complete_cbs = {}945        self.lock = threading.Lock()946    def __enter__(self):947        return self948    def __exit__(self, type_, value, traceback):949        self.close()950        return False951    def __del__(self):952        self.close()953    def __aio_safe_cb(self, completion, _):954        """955        Callback to onsafe() for asynchronous operations956        """957        cb = None958        with self.lock:959            cb = self.safe_cbs[completion]960            del self.safe_cbs[completion]961        cb.onsafe(cb)962        return 0963    def __aio_complete_cb(self, completion, _):964        """965        Callback to oncomplete() for asynchronous operations966        """967        cb = None968        with self.lock:969            cb = self.complete_cbs[completion]970            del self.complete_cbs[completion]971        cb.oncomplete(cb)972        return 0973    def __get_completion(self, oncomplete, onsafe):974        """975        Constructs a completion to use with asynchronous operations976        :param oncomplete: what to do when the write is safe and complete in memory977            on all replicas978        :type oncomplete: completion979        :param onsafe:  what to do when the write is safe and complete on storage980            on all replicas981        :type onsafe: completion982        :raises: :class:`Error`983        :returns: completion object984        """985        completion = c_void_p(0)986        complete_cb = None987        safe_cb = None988        if oncomplete:989            complete_cb = RADOS_CB(self.__aio_complete_cb)990        if onsafe:991            safe_cb = RADOS_CB(self.__aio_safe_cb)992        ret = run_in_thread(self.librados.rados_aio_create_completion,993                            (c_void_p(0), complete_cb, safe_cb,994                             byref(completion)))995        if ret < 0:996            raise make_ex(ret, "error getting a completion")997        with self.lock:998            completion_obj = Completion(self, completion, oncomplete, onsafe,999                                        complete_cb, safe_cb)1000            if oncomplete:1001                self.complete_cbs[completion.value] = completion_obj1002            if onsafe:1003                self.safe_cbs[completion.value] = completion_obj1004        return completion_obj1005    def aio_write(self, object_name, to_write, offset=0,1006                  oncomplete=None, onsafe=None):1007        """1008        Write data to an object asynchronously1009        Queues the write and returns.1010        :param object_name: name of the object1011        :type object_name: str1012        :param to_write: data to write1013        :type to_write: str1014        :param offset: byte offset in the object to begin writing at1015        :type offset: int1016        :param oncomplete: what to do when the write is safe and complete in memory1017            on all replicas1018        :type oncomplete: completion1019        :param onsafe:  what to do when the write is safe and complete on storage1020            on all replicas1021        :type onsafe: completion1022        :raises: :class:`Error`1023        :returns: completion object1024        """1025        completion = self.__get_completion(oncomplete, onsafe)1026        ret = run_in_thread(self.librados.rados_aio_write,1027                            (self.io, c_char_p(object_name),1028                             completion.rados_comp, c_char_p(to_write),1029                             c_size_t(len(to_write)), c_uint64(offset)))1030        if ret < 0:1031            raise make_ex(ret, "error writing object %s" % object_name)1032        return completion1033    def aio_write_full(self, object_name, to_write,1034                       oncomplete=None, onsafe=None):1035        """1036        Asychronously write an entire object1037        The object is filled with the provided data. If the object exists,1038        it is atomically truncated and then written.1039        Queues the write and returns.1040        :param object_name: name of the object1041        :type object_name: str1042        :param to_write: data to write1043        :type to_write: str1044        :param oncomplete: what to do when the write is safe and complete in memory1045            on all replicas1046        :type oncomplete: completion1047        :param onsafe:  what to do when the write is safe and complete on storage1048            on all replicas1049        :type onsafe: completion1050        :raises: :class:`Error`1051        :returns: completion object1052        """1053        completion = self.__get_completion(oncomplete, onsafe)1054        ret = run_in_thread(self.librados.rados_aio_write_full,1055                            (self.io, c_char_p(object_name),1056                             completion.rados_comp, c_char_p(to_write),1057                             c_size_t(len(to_write))))1058        if ret < 0:1059            raise make_ex(ret, "error writing object %s" % object_name)1060        return completion1061    def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):1062        """1063        Asychronously append data to an object1064        Queues the write and returns.1065        :param object_name: name of the object1066        :type object_name: str1067        :param to_append: data to append1068        :type to_append: str1069        :param offset: byte offset in the object to begin writing at1070        :type offset: int1071        :param oncomplete: what to do when the write is safe and complete in memory1072            on all replicas1073        :type oncomplete: completion1074        :param onsafe:  what to do when the write is safe and complete on storage1075            on all replicas1076        :type onsafe: completion1077        :raises: :class:`Error`1078        :returns: completion object1079        """1080        completion = self.__get_completion(oncomplete, onsafe)1081        ret = run_in_thread(self.librados.rados_aio_append,1082                            (self.io, c_char_p(object_name),1083                             completion.rados_comp, c_char_p(to_append),1084                             c_size_t(len(to_append))))1085        if ret < 0:1086            raise make_ex(ret, "error appending to object %s" % object_name)1087        return completion1088    def aio_flush(self):1089        """1090        Block until all pending writes in an io context are safe1091        :raises: :class:`Error`1092        """1093        ret = run_in_thread(self.librados.rados_aio_flush, (self.io,))1094        if ret < 0:1095            raise make_ex(ret, "error flushing")1096    def aio_read(self, object_name, length, offset, oncomplete):1097        """1098        Asychronously read data from an object1099        oncomplete will be called with the returned read value as1100        well as the completion:1101        oncomplete(completion, data_read)1102        :param object_name: name of the object to read from1103        :type object_name: str1104        :param length: the number of bytes to read1105        :type length: int1106        :param offset: byte offset in the object to begin reading from1107        :type offset: int1108        :param oncomplete: what to do when the read is complete1109        :type oncomplete: completion1110        :raises: :class:`Error`1111        :returns: completion object1112        """1113        buf = create_string_buffer(length)1114        def oncomplete_(completion_v):1115            return_value = completion_v.get_return_value()1116            return oncomplete(completion_v,1117                              ctypes.string_at(buf, return_value) if return_value >= 0 else None)1118        completion = self.__get_completion(oncomplete_, None)1119        ret = run_in_thread(self.librados.rados_aio_read,1120                            (self.io, c_char_p(object_name),1121                             completion.rados_comp, buf, c_size_t(length),1122                             c_uint64(offset)))1123        if ret < 0:1124            raise make_ex(ret, "error reading %s" % object_name)1125        return completion1126    def aio_remove(self, object_name, oncomplete=None, onsafe=None):1127        """1128        Asychronously remove an object1129        :param object_name: name of the object to remove1130        :type object_name: str1131        :param oncomplete: what to do when the remove is safe and complete in memory1132            on all replicas1133        :type oncomplete: completion1134        :param onsafe:  what to do when the remove is safe and complete on storage1135            on all replicas1136        :type onsafe: completion1137        :raises: :class:`Error`1138        :returns: completion object1139        """1140        completion = self.__get_completion(oncomplete, onsafe)1141        ret = run_in_thread(self.librados.rados_aio_remove,1142                            (self.io, c_char_p(object_name),1143                             completion.rados_comp))1144        if ret < 0:1145            raise make_ex(ret, "error removing %s" % object_name)1146        return completion1147    def require_ioctx_open(self):1148        """1149        Checks if the rados.Ioctx object state is 'open'1150        :raises: IoctxStateError1151        """1152        if self.state != "open":1153            raise IoctxStateError("The pool is %s" % self.state)1154    def change_auid(self, auid):1155        """1156        Attempt to change an io context's associated auid "owner."1157        Requires that you have write permission on both the current and new1158        auid.1159        :raises: :class:`Error`1160        """1161        self.require_ioctx_open()1162        ret = run_in_thread(self.librados.rados_ioctx_pool_set_auid,1163                            (self.io, ctypes.c_uint64(auid)))1164        if ret < 0:1165            raise make_ex(ret, "error changing auid of '%s' to %d"1166                          % (self.name, auid))1167    @requires(('loc_key', str))1168    def set_locator_key(self, loc_key):1169        """1170        Set the key for mapping objects to pgs within an io context.1171        The key is used instead of the object name to determine which1172        placement groups an object is put in. This affects all subsequent1173        operations of the io context - until a different locator key is1174        set, all objects in this io context will be placed in the same pg.1175        :param loc_key: the key to use as the object locator, or NULL to discard1176            any previously set key1177        :type loc_key: str1178        :raises: :class:`TypeError`1179        """1180        self.require_ioctx_open()1181        run_in_thread(self.librados.rados_ioctx_locator_set_key,1182                      (self.io, c_char_p(loc_key)))1183        self.locator_key = loc_key1184    def get_locator_key(self):1185        """1186        Get the locator_key of context1187        :returns: locator_key1188        """1189        return self.locator_key1190    @requires(('nspace', str))1191    def set_namespace(self, nspace):1192        """1193        Set the namespace for objects within an io context.1194        The namespace in addition to the object name fully identifies1195        an object. This affects all subsequent operations of the io context1196        - until a different namespace is set, all objects in this io context1197        will be placed in the same namespace.1198        :param nspace: the namespace to use, or None/"" for the default namespace1199        :type nspace: str1200        :raises: :class:`TypeError`1201        """1202        self.require_ioctx_open()1203        if nspace is None:1204            nspace = ""1205        run_in_thread(self.librados.rados_ioctx_set_namespace,1206                      (self.io, c_char_p(nspace)))1207        self.nspace = nspace1208    def get_namespace(self):1209        """1210        Get the namespace of context1211        :returns: namespace1212        """1213        return self.nspace1214    def close(self):1215        """1216        Close a rados.Ioctx object.1217        This just tells librados that you no longer need to use the io context.1218        It may not be freed immediately if there are pending asynchronous1219        requests on it, but you should not use an io context again after1220        calling this function on it.1221        """1222        if self.state == "open":1223            self.require_ioctx_open()1224            run_in_thread(self.librados.rados_ioctx_destroy, (self.io,))1225            self.state = "closed"1226    @requires(('key', str), ('data', str))1227    def write(self, key, data, offset=0):1228        """1229        Write data to an object synchronously1230        :param key: name of the object1231        :type key: str1232        :param data: data to write1233        :type data: str1234        :param offset: byte offset in the object to begin writing at1235        :type offset: int1236        :raises: :class:`TypeError`1237        :raises: :class:`LogicError`1238        :returns: int - 0 on success1239        """1240        self.require_ioctx_open()1241        length = len(data)1242        ret = run_in_thread(self.librados.rados_write,1243                            (self.io, c_char_p(key), c_char_p(data),1244                             c_size_t(length), c_uint64(offset)))1245        if ret == 0:1246            return ret1247        elif ret < 0:1248            raise make_ex(ret, "Ioctx.write(%s): failed to write %s"1249                          % (self.name, key))1250        else:1251            raise LogicError("Ioctx.write(%s): rados_write \1252returned %d, but should return zero on success." % (self.name, ret))1253    @requires(('key', str), ('data', str))1254    def write_full(self, key, data):1255        """1256        Write an entire object synchronously.1257        The object is filled with the provided data. If the object exists,1258        it is atomically truncated and then written.1259        :param key: name of the object1260        :type key: str1261        :param data: data to write1262        :type data: str1263        :raises: :class:`TypeError`1264        :raises: :class:`Error`1265        :returns: int - 0 on success1266        """1267        self.require_ioctx_open()1268        length = len(data)1269        ret = run_in_thread(self.librados.rados_write_full,1270                            (self.io, c_char_p(key), c_char_p(data),1271                             c_size_t(length)))1272        if ret == 0:1273            return ret1274        elif ret < 0:1275            raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"1276                          % (self.name, key))1277        else:1278            raise LogicError("Ioctx.write_full(%s): rados_write_full \1279returned %d, but should return zero on success." % (self.name, ret))1280    @requires(('key', str), ('data', str))1281    def append(self, key, data):1282        """1283        Append data to an object synchronously1284        :param key: name of the object1285        :type key: str1286        :param data: data to write1287        :type data: str1288        :raises: :class:`TypeError`1289        :raises: :class:`LogicError`1290        :returns: int - 0 on success1291        """1292        self.require_ioctx_open()1293        length = len(data)1294        ret = run_in_thread(self.librados.rados_append,1295                            (self.io, c_char_p(key), c_char_p(data),1296                             c_size_t(length)))1297        if ret == 0:1298            return ret1299        elif ret < 0:1300            raise make_ex(ret, "Ioctx.append(%s): failed to append %s"1301                          % (self.name, key))1302        else:1303            raise LogicError("Ioctx.append(%s): rados_append \1304returned %d, but should return zero on success." % (self.name, ret))1305    @requires(('key', str))1306    def read(self, key, length=8192, offset=0):1307        """1308        Read data from an object synchronously1309        :param key: name of the object1310        :type key: str1311        :param length: the number of bytes to read (default=8192)1312        :type length: int1313        :param offset: byte offset in the object to begin reading at1314        :type offset: int1315        :raises: :class:`TypeError`1316        :raises: :class:`Error`1317        :returns: str - data read from object1318        """1319        self.require_ioctx_open()1320        ret_buf = create_string_buffer(length)1321        ret = run_in_thread(self.librados.rados_read,1322                            (self.io, c_char_p(key), ret_buf, c_size_t(length),1323                             c_uint64(offset)))1324        if ret < 0:1325            raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))1326        return ctypes.string_at(ret_buf, ret)1327    def get_stats(self):1328        """1329        Get pool usage statistics1330        :returns: dict - contains the following keys:1331            - ``num_bytes`` (int) - size of pool in bytes1332            - ``num_kb`` (int) - size of pool in kbytes1333            - ``num_objects`` (int) - number of objects in the pool1334            - ``num_object_clones`` (int) - number of object clones1335            - ``num_object_copies`` (int) - number of object copies1336            - ``num_objects_missing_on_primary`` (int) - number of objets1337                missing on primary1338            - ``num_objects_unfound`` (int) - number of unfound objects1339            - ``num_objects_degraded`` (int) - number of degraded objects1340            - ``num_rd`` (int) - bytes read1341            - ``num_rd_kb`` (int) - kbytes read1342            - ``num_wr`` (int) - bytes written1343            - ``num_wr_kb`` (int) - kbytes written1344        """1345        self.require_ioctx_open()1346        stats = rados_pool_stat_t()1347        ret = run_in_thread(self.librados.rados_ioctx_pool_stat,1348                            (self.io, byref(stats)))1349        if ret < 0:1350            raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)1351        return {'num_bytes': stats.num_bytes,1352                'num_kb': stats.num_kb,1353                'num_objects': stats.num_objects,1354                'num_object_clones': stats.num_object_clones,1355                'num_object_copies': stats.num_object_copies,1356                "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,1357                "num_objects_unfound": stats.num_objects_unfound,1358                "num_objects_degraded": stats.num_objects_degraded,1359                "num_rd": stats.num_rd,1360                "num_rd_kb": stats.num_rd_kb,1361                "num_wr": stats.num_wr,1362                "num_wr_kb": stats.num_wr_kb}1363    @requires(('key', str))1364    def remove_object(self, key):1365        """1366        Delete an object1367        This does not delete any snapshots of the object.1368        :param key: the name of the object to delete1369        :type key: str1370        :raises: :class:`TypeError`1371        :raises: :class:`Error`1372        :returns: bool - True on success1373        """1374        self.require_ioctx_open()1375        ret = run_in_thread(self.librados.rados_remove,1376                            (self.io, c_char_p(key)))1377        if ret < 0:1378            raise make_ex(ret, "Failed to remove '%s'" % key)1379        return True1380    @requires(('key', str))1381    def trunc(self, key, size):1382        """1383        Resize an object1384        If this enlarges the object, the new area is logically filled with1385        zeroes. If this shrinks the object, the excess data is removed.1386        :param key: the name of the object to resize1387        :type key: str1388        :param size: the new size of the object in bytes1389        :type size: int1390        :raises: :class:`TypeError`1391        :raises: :class:`Error`1392        :returns: int - 0 on success, otherwise raises error1393        """1394        self.require_ioctx_open()1395        ret = run_in_thread(self.librados.rados_trunc,1396                            (self.io, c_char_p(key), c_uint64(size)))1397        if ret < 0:1398            raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))1399        return ret1400    @requires(('key', str))1401    def stat(self, key):1402        """1403        Get object stats (size/mtime)1404        :param key: the name of the object to get stats from1405        :type key: str1406        :raises: :class:`TypeError`1407        :raises: :class:`Error`1408        :returns: (size,timestamp)1409        """1410        self.require_ioctx_open()1411        psize = c_uint64()1412        pmtime = c_uint64()1413        ret = run_in_thread(self.librados.rados_stat,1414                            (self.io, c_char_p(key), pointer(psize),1415                             pointer(pmtime)))1416        if ret < 0:1417            raise make_ex(ret, "Failed to stat %r" % key)1418        return psize.value, time.localtime(pmtime.value)1419    @requires(('key', str), ('xattr_name', str))1420    def get_xattr(self, key, xattr_name):1421        """1422        Get the value of an extended attribute on an object.1423        :param key: the name of the object to get xattr from1424        :type key: str1425        :param xattr_name: which extended attribute to read1426        :type xattr_name: str1427        :raises: :class:`TypeError`1428        :raises: :class:`Error`1429        :returns: str - value of the xattr1430        """1431        self.require_ioctx_open()1432        ret_length = 40961433        while ret_length < 4096 * 1024 * 1024:1434            ret_buf = create_string_buffer(ret_length)1435            ret = run_in_thread(self.librados.rados_getxattr,1436                                (self.io, c_char_p(key), c_char_p(xattr_name),1437                                 ret_buf, c_size_t(ret_length)))1438            if (ret == -errno.ERANGE):1439                ret_length *= 21440            elif ret < 0:1441                raise make_ex(ret, "Failed to get xattr %r" % xattr_name)1442            else:1443                break1444        return ctypes.string_at(ret_buf, ret)1445    @requires(('oid', str))1446    def get_xattrs(self, oid):1447        """1448        Start iterating over xattrs on an object.1449        :param oid: the name of the object to get xattrs from1450        :type key: str1451        :raises: :class:`TypeError`1452        :raises: :class:`Error`1453        :returns: XattrIterator1454        """1455        self.require_ioctx_open()1456        it = c_void_p(0)1457        ret = run_in_thread(self.librados.rados_getxattrs,1458                            (self.io, oid, byref(it)))1459        if ret != 0:1460            raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)1461        return XattrIterator(self, it, oid)1462    @requires(('key', str), ('xattr_name', str), ('xattr_value', str))1463    def set_xattr(self, key, xattr_name, xattr_value):1464        """1465        Set an extended attribute on an object.1466        :param key: the name of the object to set xattr to1467        :type key: str1468        :param xattr_name: which extended attribute to set1469        :type xattr_name: str1470        :param xattr_value: the value of the  extended attribute1471        :type xattr_value: str1472        :raises: :class:`TypeError`1473        :raises: :class:`Error`1474        :returns: bool - True on success, otherwise raise an error1475        """1476        self.require_ioctx_open()1477        ret = run_in_thread(self.librados.rados_setxattr,1478                            (self.io, c_char_p(key), c_char_p(xattr_name),1479                             c_char_p(xattr_value), c_size_t(len(xattr_value))))1480        if ret < 0:1481            raise make_ex(ret, "Failed to set xattr %r" % xattr_name)1482        return True1483    @requires(('key', str), ('xattr_name', str))1484    def rm_xattr(self, key, xattr_name):1485        """1486        Removes an extended attribute on from an object.1487        :param key: the name of the object to remove xattr from1488        :type key: str1489        :param xattr_name: which extended attribute to remove1490        :type xattr_name: str1491        :raises: :class:`TypeError`1492        :raises: :class:`Error`1493        :returns: bool - True on success, otherwise raise an error1494        """1495        self.require_ioctx_open()1496        ret = run_in_thread(self.librados.rados_rmxattr,1497                            (self.io, c_char_p(key), c_char_p(xattr_name)))1498        if ret < 0:1499            raise make_ex(ret, "Failed to delete key %r xattr %r" %1500                          (key, xattr_name))1501        return True1502    def list_objects(self):1503        """1504        Get ObjectIterator on rados.Ioctx object.1505        :returns: ObjectIterator1506        """1507        self.require_ioctx_open()1508        return ObjectIterator(self)1509    def list_snaps(self):1510        """1511        Get SnapIterator on rados.Ioctx object.1512        :returns: SnapIterator1513        """1514        self.require_ioctx_open()1515        return SnapIterator(self)1516    @requires(('snap_name', str))1517    def create_snap(self, snap_name):1518        """1519        Create a pool-wide snapshot1520        :param snap_name: the name of the snapshot1521        :type snap_name: str1522        :raises: :class:`TypeError`1523        :raises: :class:`Error`1524        """1525        self.require_ioctx_open()1526        ret = run_in_thread(self.librados.rados_ioctx_snap_create,1527                            (self.io, c_char_p(snap_name)))1528        if (ret != 0):1529            raise make_ex(ret, "Failed to create snap %s" % snap_name)1530    @requires(('snap_name', str))1531    def remove_snap(self, snap_name):1532        """1533        Removes a pool-wide snapshot1534        :param snap_name: the name of the snapshot1535        :type snap_name: str1536        :raises: :class:`TypeError`1537        :raises: :class:`Error`1538        """1539        self.require_ioctx_open()1540        ret = run_in_thread(self.librados.rados_ioctx_snap_remove,1541                            (self.io, c_char_p(snap_name)))1542        if (ret != 0):1543            raise make_ex(ret, "Failed to remove snap %s" % snap_name)1544    @requires(('snap_name', str))1545    def lookup_snap(self, snap_name):1546        """1547        Get the id of a pool snapshot1548        :param snap_name: the name of the snapshot to lookop1549        :type snap_name: str1550        :raises: :class:`TypeError`1551        :raises: :class:`Error`1552        :returns: Snap - on success1553        """1554        self.require_ioctx_open()1555        snap_id = c_uint64()1556        ret = run_in_thread(self.librados.rados_ioctx_snap_lookup,1557                            (self.io, c_char_p(snap_name), byref(snap_id)))1558        if (ret != 0):1559            raise make_ex(ret, "Failed to lookup snap %s" % snap_name)1560        return Snap(self, snap_name, snap_id)1561    def get_last_version(self):1562        """1563        Return the version of the last object read or written to.1564        This exposes the internal version number of the last object read or1565        written via this io context1566        :returns: version of the last object used1567        """1568        self.require_ioctx_open()1569        return run_in_thread(self.librados.rados_get_last_version, (self.io,))1570    def create_write_op(self):1571        """1572        create write operation object.1573        need call release_write_op after use1574        """1575        self.librados.rados_create_write_op.restype = c_void_p1576        return run_in_thread(self.librados.rados_create_write_op, (None,))1577    def create_read_op(self):1578        """1579        create read operation object.1580        need call release_read_op after use1581        """1582        self.librados.rados_create_read_op.restype = c_void_p1583        return run_in_thread(self.librados.rados_create_read_op, (None,))1584    def release_write_op(self, write_op):1585        """1586        release memory alloc by create_write_op1587        """1588        self.librados.rados_release_write_op.argtypes = [c_void_p]1589        run_in_thread(self.librados.rados_release_write_op, (c_void_p(write_op),))1590    def release_read_op(self, read_op):1591        """1592        release memory alloc by create_read_op1593        :para read_op: read_op object1594        :type: int1595        """1596        self.librados.rados_release_read_op.argtypes = [c_void_p]1597        run_in_thread(self.librados.rados_release_read_op, (c_void_p(read_op),))1598    @requires(('write_op', int), ('keys', tuple), ('values', tuple))1599    def set_omap(self, write_op, keys, values):1600        """1601        set keys values to write_op1602        :para write_op: write_operation object1603        :type write_op: int1604        :para keys: a tuple of keys1605        :type keys: tuple1606        :para values: a tuple of values1607        :type values: tuple1608        """1609        if len(keys) != len(values):1610            raise Error("Rados(): keys and values must have the same number of items")1611        key_num = len(keys)1612        key_array_type = c_char_p*key_num1613        key_array = key_array_type()1614        key_array[:] = keys1615        value_array_type = c_char_p*key_num1616        value_array = value_array_type()1617        value_array[:] = values1618        lens_array_type = c_size_t*key_num1619        lens_array = lens_array_type()1620        for index, value in enumerate(values):1621            lens_array[index] = c_size_t(len(value))1622        run_in_thread(self.librados.rados_write_op_omap_set,1623                      (c_void_p(write_op), byref(key_array), byref(value_array),1624                       byref(lens_array), c_int(key_num),))1625    @requires(('write_op', int), ('oid', str), ('mtime', opt(int)), ('flags', opt(int)))1626    def operate_write_op(self, write_op, oid, mtime=0, flags=0):1627        """1628        excute the real write operation1629        :para write_op: write operation object1630        :type write_op: int1631        :para oid: object name1632        :type oid: str1633        :para mtime: the time to set the mtime to, 0 for the current time1634        :type mtime: int1635        :para flags: flags to apply to the entire operation1636        :type flags: int1637        """1638        run_in_thread(self.librados.rados_write_op_operate,1639                      (c_void_p(write_op), self.io, c_char_p(oid),1640                       c_long(mtime), c_int(flags),))1641    @requires(('read_op', int), ('oid', str), ('flag', opt(int)))1642    def operate_read_op(self, read_op, oid, flag=0):1643        """1644        excute the real read operation1645        :para read_op: read operation object1646        :type read_op: int1647        :para oid: object name1648        :type oid: str1649        :para flag: flags to apply to the entire operation1650        :type flag: int1651        """1652        run_in_thread(self.librados.rados_read_op_operate,1653                      (c_void_p(read_op), self.io, c_char_p(oid), c_int(flag),))1654    @requires(('read_op', int), ('start_after', str), ('filter_prefix', str), ('max_return', int))1655    def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):1656        """1657        get the omap values1658        :para read_op: read operation object1659        :type read_op: int1660        :para start_after: list keys starting after start_after1661        :type start_after: str1662        :para filter_prefix: list only keys beginning with filter_prefix1663        :type filter_prefix: str1664        :para max_return: list no more than max_return key/value pairs1665        :type max_return: int1666        :returns: an iterator over the the requested omap values, return value from this action1667        """1668        prval = c_int()1669        iter_addr = c_void_p()1670        run_in_thread(self.librados.rados_read_op_omap_get_vals,1671                      (c_void_p(read_op), c_char_p(start_after),1672                       c_char_p(filter_prefix), c_int(max_return),1673                       byref(iter_addr), pointer(prval)))1674        return OmapIterator(self, iter_addr), prval.value1675    @requires(('read_op', int), ('start_after', str), ('max_return', int))1676    def get_omap_keys(self, read_op, start_after, max_return):1677        """1678        get the omap keys1679        :para read_op: read operation object1680        :type read_op: int1681        :para start_after: list keys starting after start_after1682        :type start_after: str1683        :para max_return: list no more than max_return key/value pairs1684        :type max_return: int1685        :returns: an iterator over the the requested omap values, return value from this action1686        """1687        prval = c_int()1688        iter_addr = c_void_p()1689        run_in_thread(self.librados.rados_read_op_omap_get_keys,1690                      (c_void_p(read_op), c_char_p(start_after),1691                       c_int(max_return), byref(iter_addr), pointer(prval)))1692        return OmapIterator(self, iter_addr), prval.value1693    @requires(('read_op', int), ('keys', tuple))1694    def get_omap_vals_by_keys(self, read_op, keys):1695        """1696        get the omap values by keys1697        :para read_op: read operation object1698        :type read_op: int1699        :para keys: input key tuple1700        :type keys: tuple1701        :returns: an iterator over the the requested omap values, return value from this action1702        """1703        prval = c_int()1704        iter_addr = c_void_p()1705        key_num = len(keys)1706        key_array_type = c_char_p*key_num1707        key_array = key_array_type()1708        key_array[:] = keys1709        run_in_thread(self.librados.rados_read_op_omap_get_vals_by_keys,1710                      (c_void_p(read_op), byref(key_array), c_int(key_num),1711                       byref(iter_addr), pointer(prval)))1712        return OmapIterator(self, iter_addr), prval.value1713    @requires(('write_op', int), ('keys', tuple))1714    def remove_omap_keys(self, write_op, keys):1715        """1716        remove omap keys specifiled1717        :para write_op: write operation object1718        :type write_op: int1719        :para keys: input key tuple1720        :type keys: tuple1721        """1722        key_num = len(keys)1723        key_array_type = c_char_p*key_num1724        key_array = key_array_type()1725        key_array[:] = keys1726        run_in_thread(self.librados.rados_write_op_omap_rm_keys,1727                      (c_void_p(write_op), byref(key_array), c_int(key_num)))1728    @requires(('write_op', int))1729    def clear_omap(self, write_op):1730        """1731        Remove all key/value pairs from an object1732        :para write_op: write operation object1733        :type write_op: int1734        """1735        run_in_thread(self.librados.rados_write_op_omap_clear,1736                      (c_void_p(write_op),))1737    @requires(('key', str), ('name', str), ('cookie', str), ('desc', str),1738              ('duration', opt(int)), ('flags', int))1739    def lock_exclusive(self, key, name, cookie, desc="", duration=None, flags=0):1740        """1741        Take an exclusive lock on an object1742        :param key: name of the object1743        :type key: str1744        :param name: name of the lock1745        :type name: str1746        :param cookie: cookie of the lock1747        :type cookie: str1748        :param desc: description of the lock1749        :type desc: str1750        :param duration: duration of the lock in seconds1751        :type duration: int1752        :param flags: flags1753        :type flags: int1754        :raises: :class:`TypeError`1755        :raises: :class:`Error`1756        """1757        self.require_ioctx_open()1758        ret = run_in_thread(self.librados.rados_lock_exclusive,1759                            (self.io, c_char_p(key), c_char_p(name), c_char_p(cookie),1760                             c_char_p(desc),1761                             timeval(duration, None) if duration is None else None,1762                             c_uint8(flags)))1763        if ret < 0:1764            raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))1765    @requires(('key', str), ('name', str), ('cookie', str), ('tag', str),1766              ('desc', str), ('duration', opt(int)), ('flags', int))1767    def lock_shared(self, key, name, cookie, tag, desc="", duration=None, flags=0):1768        """1769        Take a shared lock on an object1770        :param key: name of the object1771        :type key: str1772        :param name: name of the lock1773        :type name: str1774        :param cookie: cookie of the lock1775        :type cookie: str1776        :param tag: tag of the lock1777        :type tag: str1778        :param desc: description of the lock1779        :type desc: str1780        :param duration: duration of the lock in seconds1781        :type duration: int1782        :param flags: flags1783        :type flags: int1784        :raises: :class:`TypeError`1785        :raises: :class:`Error`1786        """1787        self.require_ioctx_open()1788        ret = run_in_thread(self.librados.rados_lock_shared,1789                            (self.io, c_char_p(key), c_char_p(name), c_char_p(cookie),1790                             c_char_p(tag), c_char_p(desc),1791                             timeval(duration, None) if duration is None else None,1792                             c_uint8(flags)))1793        if ret < 0:1794            raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))1795    @requires(('key', str), ('name', str), ('cookie', str))1796    def unlock(self, key, name, cookie):1797        """1798        Release a shared or exclusive lock on an object1799        :param key: name of the object1800        :type key: str1801        :param name: name of the lock1802        :type name: str1803        :param cookie: cookie of the lock1804        :type cookie: str1805        :raises: :class:`TypeError`1806        :raises: :class:`Error`1807        """1808        self.require_ioctx_open()1809        ret = run_in_thread(self.librados.rados_unlock,1810                            (self.io, c_char_p(key), c_char_p(name), c_char_p(cookie)))1811        if ret < 0:1812            raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))1813def set_object_locator(func):1814    def retfunc(self, *args, **kwargs):1815        if self.locator_key is not None:1816            old_locator = self.ioctx.get_locator_key()1817            self.ioctx.set_locator_key(self.locator_key)1818            retval = func(self, *args, **kwargs)1819            self.ioctx.set_locator_key(old_locator)1820            return retval1821        else:1822            return func(self, *args, **kwargs)1823    return retfunc1824def set_object_namespace(func):1825    def retfunc(self, *args, **kwargs):1826        if self.nspace is None:1827            raise LogicError("Namespace not set properly in context")1828        old_nspace = self.ioctx.get_namespace()1829        self.ioctx.set_namespace(self.nspace)1830        retval = func(self, *args, **kwargs)1831        self.ioctx.set_namespace(old_nspace)1832        return retval1833    return retfunc1834class Object(object):1835    """Rados object wrapper, makes the object look like a file"""1836    def __init__(self, ioctx, key, locator_key=None, nspace=None):1837        self.key = key1838        self.ioctx = ioctx1839        self.offset = 01840        self.state = "exists"1841        self.locator_key = locator_key1842        self.nspace = "" if nspace is None else nspace1843    def __str__(self):1844        return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \1845            (str(self.ioctx), self.key, "--default--"1846             if self.nspace is "" else self.nspace, self.locator_key)1847    def require_object_exists(self):1848        if self.state != "exists":1849            raise ObjectStateError("The object is %s" % self.state)1850    @set_object_locator1851    @set_object_namespace1852    def read(self, length=1024 * 1024):1853        self.require_object_exists()1854        ret = self.ioctx.read(self.key, length, self.offset)1855        self.offset += len(ret)1856        return ret1857    @set_object_locator1858    @set_object_namespace1859    def write(self, string_to_write):1860        self.require_object_exists()1861        ret = self.ioctx.write(self.key, string_to_write, self.offset)1862        if ret == 0:1863            self.offset += len(string_to_write)1864        return ret1865    @set_object_locator1866    @set_object_namespace1867    def remove(self):1868        self.require_object_exists()1869        self.ioctx.remove_object(self.key)1870        self.state = "removed"1871    @set_object_locator1872    @set_object_namespace1873    def stat(self):1874        self.require_object_exists()1875        return self.ioctx.stat(self.key)1876    def seek(self, position):1877        self.require_object_exists()1878        self.offset = position1879    @set_object_locator1880    @set_object_namespace1881    def get_xattr(self, xattr_name):1882        self.require_object_exists()1883        return self.ioctx.get_xattr(self.key, xattr_name)1884    @set_object_locator1885    @set_object_namespace1886    def get_xattrs(self):1887        self.require_object_exists()1888        return self.ioctx.get_xattrs(self.key)1889    @set_object_locator1890    @set_object_namespace1891    def set_xattr(self, xattr_name, xattr_value):1892        self.require_object_exists()1893        return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)1894    @set_object_locator1895    @set_object_namespace1896    def rm_xattr(self, xattr_name):1897        self.require_object_exists()1898        return self.ioctx.rm_xattr(self.key, xattr_name)1899MONITOR_LEVELS = [1900    "debug",1901    "info",1902    "warn", "warning",1903    "err", "error",1904    "sec",1905    ]1906class MonitorLog(object):1907    """1908    For watching cluster log messages.  Instantiate an object and keep1909    it around while callback is periodically called.  Construct with1910    'level' to monitor 'level' messages (one of MONITOR_LEVELS).1911    arg will be passed to the callback.1912    callback will be called with:1913        arg (given to __init__)1914        line (the full line, including timestamp, who, level, msg)1915        who (which entity issued the log message)1916        timestamp_sec (sec of a struct timespec)1917        timestamp_nsec (sec of a struct timespec)1918        seq (sequence number)1919        level (string representing the level of the log message)1920        msg (the message itself)1921    callback's return value is ignored1922    """1923    def monitor_log_callback(self, arg, line, who, sec, nsec, seq, level, msg):1924        """1925        Local callback wrapper, in case we decide to do something1926        """1927        self.callback(arg, line, who, sec, nsec, seq, level, msg)1928        return 01929    def __init__(self, cluster, level, callback, arg):1930        if level not in MONITOR_LEVELS:1931            raise LogicError("invalid monitor level " + level)1932        if not callable(callback):1933            raise LogicError("callback must be a callable function")1934        self.level = level1935        self.callback = callback1936        self.arg = arg1937        callback_factory = CFUNCTYPE(c_int,     # return type (really void)1938                                     c_void_p,  # arg1939                                     c_char_p,  # line1940                                     c_char_p,  # who1941                                     c_uint64,  # timestamp_sec1942                                     c_uint64,  # timestamp_nsec1943                                     c_ulong,   # seq1944                                     c_char_p,  # level1945                                     c_char_p)  # msg1946        self.internal_callback = callback_factory(self.monitor_log_callback)1947        r = run_in_thread(cluster.librados.rados_monitor_log,1948                          (cluster.cluster, level, self.internal_callback, arg))1949        if r:...test_all.py
Source:test_all.py  
...4import pydron5import logging6from twisted.internet import threads7logging.basicConfig(level=logging.DEBUG)8def run_in_thread(func):9    def wrapper(*args, **kwargs):10        return threads.deferToThread(func, *args, **kwargs)11    return wrapper12class IntegrationTests(unittest.TestCase):13    14    @utwist.with_reactor15    @run_in_thread16    def test_default_return_value(self):17        @pydron.schedule18        def target():19            pass20        self.assertIsNone(target())21        22        ...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
