How to use run_in_thread method in tox

Best Python code snippet using tox_python

rados.py

Source:rados.py Github

copy

Full Screen

...129 def run(self):130 self.retval = self.target(*self.args)131# time in seconds between each call to t.join() for child thread132POLL_TIME_INCR = 0.5133def run_in_thread(target, args, timeout=0):134 interrupt = False135 countdown = timeout136 t = RadosThread(target, args)137 # allow the main thread to exit (presumably, avoid a join() on this138 # subthread) before this thread terminates. This allows SIGINT139 # exit of a blocked call. See below.140 t.daemon = True141 t.start()142 try:143 # poll for thread exit144 while t.is_alive():145 t.join(POLL_TIME_INCR)146 if timeout and t.is_alive():147 countdown = countdown - POLL_TIME_INCR148 if countdown <= 0:149 raise KeyboardInterrupt150 t.join() # in case t exits before reaching the join() above151 except KeyboardInterrupt:152 # ..but allow SIGINT to terminate the waiting. Note: this153 # relies on the Linux kernel behavior of delivering the signal154 # to the main thread in preference to any subthread (all that's155 # strictly guaranteed is that *some* thread that has the signal156 # unblocked will receive it). But there doesn't seem to be157 # any interface to create t with SIGINT blocked.158 interrupt = True159 if interrupt:160 t.retval = -errno.EINTR161 return t.retval162# helper to specify an optional argument, where in addition to `cls`, `None`163# is also acceptable164def opt(cls):165 return (cls, None)166# validate argument types of an instance method167# kwargs is an un-ordered dict, so use args instead168def requires(*types):169 def is_type_of(v, t):170 if t is None:171 return v is None172 else:173 return isinstance(v, t)174 def check_type(val, arg_name, arg_type):175 if isinstance(arg_type, tuple):176 if any(is_type_of(val, t) for t in arg_type):177 return178 type_names = ' or '.join('None' if t is None else t.__name__179 for t in arg_type)180 raise TypeError('%s must be %s' % (arg_name, type_names))181 else:182 if is_type_of(val, arg_type):183 return184 assert(arg_type is not None)185 raise TypeError('%s must be %s' % (arg_name, arg_type.__name__))186 def wrapper(f):187 @wraps(f)188 def validate_func(*args, **kwargs):189 # ignore the `self` arg190 pos_args = zip(args[1:], types)191 named_args = ((kwargs[name], (name, spec)) for name, spec in types192 if name in kwargs)193 for arg_val, (arg_name, arg_type) in chain(pos_args, named_args):194 check_type(arg_val, arg_name, arg_type)195 return f(*args, **kwargs)196 return validate_func197 return wrapper198class Rados(object):199 """librados python wrapper"""200 def require_state(self, *args):201 """202 Checks if the Rados object is in a special state203 :raises: RadosStateError204 """205 if self.state in args:206 return207 raise RadosStateError("You cannot perform that operation on a \208Rados object in state %s." % self.state)209 @requires(('rados_id', opt(str)), ('name', opt(str)), ('clustername', opt(str)),210 ('conffile', opt(str)))211 def __init__(self, rados_id=None, name=None, clustername=None,212 conf_defaults=None, conffile=None, conf=None, flags=0):213 library_path = find_library('rados')214 # maybe find_library can not find it correctly on all platforms,215 # so fall back to librados.so.2 in such case.216 self.librados = CDLL(library_path if library_path is not None else 'librados.so.2')217 self.parsed_args = []218 self.conf_defaults = conf_defaults219 self.conffile = conffile220 self.cluster = c_void_p()221 self.rados_id = rados_id222 if rados_id and name:223 raise Error("Rados(): can't supply both rados_id and name")224 elif rados_id:225 name = 'client.' + rados_id226 elif name is None:227 name = 'client.admin'228 if clustername is None:229 clustername = 'ceph'230 ret = run_in_thread(self.librados.rados_create2,231 (byref(self.cluster), c_char_p(clustername),232 c_char_p(name), c_uint64(flags)))233 if ret != 0:234 raise Error("rados_initialize failed with error code: %d" % ret)235 self.state = "configuring"236 # order is important: conf_defaults, then conffile, then conf237 if conf_defaults:238 for key, value in conf_defaults.iteritems():239 self.conf_set(key, value)240 if conffile is not None:241 # read the default conf file when '' is given242 if conffile == '':243 conffile = None244 self.conf_read_file(conffile)245 if conf:246 for key, value in conf.iteritems():247 self.conf_set(key, value)248 def shutdown(self):249 """250 Disconnects from the cluster. Call this explicitly when a251 Rados.connect()ed object is no longer used.252 """253 if hasattr(self, "state") and self.state != "shutdown":254 run_in_thread(self.librados.rados_shutdown, (self.cluster,))255 self.state = "shutdown"256 def __enter__(self):257 self.connect()258 return self259 def __exit__(self, type_, value, traceback):260 self.shutdown()261 return False262 def version(self):263 """264 Get the version number of the ``librados`` C library.265 :returns: a tuple of ``(major, minor, extra)`` components of the266 librados version267 """268 major = c_int(0)269 minor = c_int(0)270 extra = c_int(0)271 run_in_thread(self.librados.rados_version,272 (byref(major), byref(minor), byref(extra)))273 return Version(major.value, minor.value, extra.value)274 @requires(('path', opt(str)))275 def conf_read_file(self, path=None):276 """277 Configure the cluster handle using a Ceph config file.278 :param path: path to the config file279 :type path: str280 """281 self.require_state("configuring", "connected")282 ret = run_in_thread(self.librados.rados_conf_read_file,283 (self.cluster, c_char_p(path)))284 if (ret != 0):285 raise make_ex(ret, "error calling conf_read_file")286 def conf_parse_argv(self, args):287 """288 Parse known arguments from args, and remove; returned289 args contain only those unknown to ceph290 """291 self.require_state("configuring", "connected")292 if not args:293 return294 # create instances of arrays of c_char_p's, both len(args) long295 # cretargs will always be a subset of cargs (perhaps identical)296 cargs = (c_char_p * len(args))(*args)297 cretargs = (c_char_p * len(args))()298 ret = run_in_thread(self.librados.rados_conf_parse_argv_remainder,299 (self.cluster, len(args), cargs, cretargs))300 if ret:301 raise make_ex(ret, "error calling conf_parse_argv_remainder")302 # cretargs was allocated with fixed length; collapse return303 # list to eliminate any missing args304 retargs = [a for a in cretargs if a is not None]305 self.parsed_args = args306 return retargs307 def conf_parse_env(self, var='CEPH_ARGS'):308 """309 Parse known arguments from an environment variable, normally310 CEPH_ARGS.311 """312 self.require_state("configuring", "connected")313 if not var:314 return315 ret = run_in_thread(self.librados.rados_conf_parse_env,316 (self.cluster, c_char_p(var)))317 if (ret != 0):318 raise make_ex(ret, "error calling conf_parse_env")319 @requires(('option', str))320 def conf_get(self, option):321 """322 Get the value of a configuration option323 :param option: which option to read324 :type option: str325 :returns: str - value of the option or None326 :raises: :class:`TypeError`327 """328 self.require_state("configuring", "connected")329 length = 20330 while True:331 ret_buf = create_string_buffer(length)332 ret = run_in_thread(self.librados.rados_conf_get,333 (self.cluster, c_char_p(option), ret_buf,334 c_size_t(length)))335 if (ret == 0):336 return ret_buf.value337 elif (ret == -errno.ENAMETOOLONG):338 length = length * 2339 elif (ret == -errno.ENOENT):340 return None341 else:342 raise make_ex(ret, "error calling conf_get")343 @requires(('option', str), ('val', str))344 def conf_set(self, option, val):345 """346 Set the value of a configuration option347 :param option: which option to set348 :type option: str349 :param option: value of the option350 :type option: str351 :raises: :class:`TypeError`, :class:`ObjectNotFound`352 """353 self.require_state("configuring", "connected")354 ret = run_in_thread(self.librados.rados_conf_set,355 (self.cluster, c_char_p(option), c_char_p(val)))356 if (ret != 0):357 raise make_ex(ret, "error calling conf_set")358 def ping_monitor(self, mon_id):359 """360 Ping a monitor to assess liveness361 May be used as a simply way to assess liveness, or to obtain362 information about the monitor in a simple way even in the363 absence of quorum.364 :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)365 :type mon_id: str366 :returns: the string reply from the monitor367 """368 self.require_state("configuring", "connected")369 outstrp = pointer(pointer(c_char()))370 outstrlen = c_long()371 ret = run_in_thread(self.librados.rados_ping_monitor,372 (self.cluster, c_char_p(mon_id),373 outstrp, byref(outstrlen)))374 my_outstr = outstrp.contents[:(outstrlen.value)]375 if outstrlen.value:376 run_in_thread(self.librados.rados_buffer_free, (outstrp.contents,))377 if ret != 0:378 raise make_ex(ret, "error calling ping_monitor")379 return my_outstr380 def connect(self, timeout=0):381 """382 Connect to the cluster. Use shutdown() to release resources.383 """384 self.require_state("configuring")385 ret = run_in_thread(self.librados.rados_connect, (self.cluster,),386 timeout)387 if (ret != 0):388 raise make_ex(ret, "error connecting to the cluster")389 self.state = "connected"390 def get_cluster_stats(self):391 """392 Read usage info about the cluster393 This tells you total space, space used, space available, and number394 of objects. These are not updated immediately when data is written,395 they are eventually consistent.396 :returns: dict - contains the following keys:397 - ``kb`` (int) - total space398 - ``kb_used`` (int) - space used399 - ``kb_avail`` (int) - free space available400 - ``num_objects`` (int) - number of objects401 """402 stats = rados_cluster_stat_t()403 ret = run_in_thread(self.librados.rados_cluster_stat,404 (self.cluster, byref(stats)))405 if ret < 0:406 raise make_ex(407 ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)408 return {'kb': stats.kb,409 'kb_used': stats.kb_used,410 'kb_avail': stats.kb_avail,411 'num_objects': stats.num_objects}412 @requires(('pool_name', str))413 def pool_exists(self, pool_name):414 """415 Checks if a given pool exists.416 :param pool_name: name of the pool to check417 :type pool_name: str418 :raises: :class:`TypeError`, :class:`Error`419 :returns: bool - whether the pool exists, false otherwise.420 """421 self.require_state("connected")422 ret = run_in_thread(self.librados.rados_pool_lookup,423 (self.cluster, c_char_p(pool_name)))424 if (ret >= 0):425 return True426 elif (ret == -errno.ENOENT):427 return False428 else:429 raise make_ex(ret, "error looking up pool '%s'" % pool_name)430 @requires(('pool_name', str))431 def pool_lookup(self, pool_name):432 """433 Returns a pool's ID based on its name.434 :param pool_name: name of the pool to look up435 :type pool_name: str436 :raises: :class:`TypeError`, :class:`Error`437 :returns: int - pool ID, or None if it doesn't exist438 """439 self.require_state("connected")440 ret = run_in_thread(self.librados.rados_pool_lookup,441 (self.cluster, c_char_p(pool_name)))442 if (ret >= 0):443 return int(ret)444 elif (ret == -errno.ENOENT):445 return None446 else:447 raise make_ex(ret, "error looking up pool '%s'" % pool_name)448 @requires(('pool_id', int))449 def pool_reverse_lookup(self, pool_id):450 """451 Returns a pool's name based on its ID.452 :param pool_id: ID of the pool to look up453 :type pool_id: int454 :raises: :class:`TypeError`, :class:`Error`455 :returns: string - pool name, or None if it doesn't exist456 """457 self.require_state("connected")458 size = c_size_t(512)459 while True:460 c_name = create_string_buffer(size.value)461 ret = run_in_thread(self.librados.rados_pool_reverse_lookup,462 (self.cluster, c_int64(pool_id), byref(c_name), size))463 if ret > size.value:464 size = c_size_t(ret)465 elif ret == -errno.ENOENT:466 return None467 elif ret < 0:468 raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)469 else:470 return c_name.value471 break472 @requires(('pool_name', str), ('auid', opt(int)), ('crush_rule', opt(int)))473 def create_pool(self, pool_name, auid=None, crush_rule=None):474 """475 Create a pool:476 - with default settings: if auid=None and crush_rule=None477 - owned by a specific auid: auid given and crush_rule=None478 - with a specific CRUSH rule: if auid=None and crush_rule given479 - with a specific CRUSH rule and auid: if auid and crush_rule given480 :param pool_name: name of the pool to create481 :type pool_name: str482 :param auid: the id of the owner of the new pool483 :type auid: int484 :param crush_rule: rule to use for placement in the new pool485 :type crush_rule: int486 :raises: :class:`TypeError`, :class:`Error`487 """488 self.require_state("connected")489 if auid is None:490 if crush_rule is None:491 ret = run_in_thread(self.librados.rados_pool_create,492 (self.cluster, c_char_p(pool_name)))493 else:494 ret = run_in_thread(self.librados.495 rados_pool_create_with_crush_rule,496 (self.cluster, c_char_p(pool_name),497 c_ubyte(crush_rule)))498 elif crush_rule is None:499 ret = run_in_thread(self.librados.rados_pool_create_with_auid,500 (self.cluster, c_char_p(pool_name),501 c_uint64(auid)))502 else:503 ret = run_in_thread(self.librados.rados_pool_create_with_all,504 (self.cluster, c_char_p(pool_name),505 c_uint64(auid), c_ubyte(crush_rule)))506 if ret < 0:507 raise make_ex(ret, "error creating pool '%s'" % pool_name)508 @requires(('pool_id', int))509 def get_pool_base_tier(self, pool_id):510 """511 Get base pool512 :returns: base pool, or pool_id if tiering is not configured for the pool513 """514 self.require_state("connected")515 base_tier = c_int64(0)516 ret = run_in_thread(self.librados.rados_pool_get_base_tier,517 (self.cluster, c_int64(pool_id), byref(base_tier)))518 if ret < 0:519 raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)520 return base_tier.value521 @requires(('pool_name', str))522 def delete_pool(self, pool_name):523 """524 Delete a pool and all data inside it.525 The pool is removed from the cluster immediately,526 but the actual data is deleted in the background.527 :param pool_name: name of the pool to delete528 :type pool_name: str529 :raises: :class:`TypeError`, :class:`Error`530 """531 self.require_state("connected")532 ret = run_in_thread(self.librados.rados_pool_delete,533 (self.cluster, c_char_p(pool_name)))534 if ret < 0:535 raise make_ex(ret, "error deleting pool '%s'" % pool_name)536 def list_pools(self):537 """538 Gets a list of pool names.539 :returns: list - of pool names.540 """541 self.require_state("connected")542 size = c_size_t(512)543 while True:544 c_names = create_string_buffer(size.value)545 ret = run_in_thread(self.librados.rados_pool_list,546 (self.cluster, byref(c_names), size))547 if ret > size.value:548 size = c_size_t(ret)549 else:550 break551 return filter(lambda name: name != '', c_names.raw.split('\0'))552 def get_fsid(self):553 """554 Get the fsid of the cluster as a hexadecimal string.555 :raises: :class:`Error`556 :returns: str - cluster fsid557 """558 self.require_state("connected")559 buf_len = 37560 fsid = create_string_buffer(buf_len)561 ret = run_in_thread(self.librados.rados_cluster_fsid,562 (self.cluster, byref(fsid), c_size_t(buf_len)))563 if ret < 0:564 raise make_ex(ret, "error getting cluster fsid")565 return fsid.value566 @requires(('ioctx_name', str))567 def open_ioctx(self, ioctx_name):568 """569 Create an io context570 The io context allows you to perform operations within a particular571 pool.572 :param ioctx_name: name of the pool573 :type ioctx_name: str574 :raises: :class:`TypeError`, :class:`Error`575 :returns: Ioctx - Rados Ioctx object576 """577 self.require_state("connected")578 ioctx = c_void_p()579 ret = run_in_thread(self.librados.rados_ioctx_create,580 (self.cluster, c_char_p(ioctx_name), byref(ioctx)))581 if ret < 0:582 raise make_ex(ret, "error opening pool '%s'" % ioctx_name)583 return Ioctx(ioctx_name, self.librados, ioctx)584 def mon_command(self, cmd, inbuf, timeout=0, target=None):585 """586 mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)587 returns (int ret, string outbuf, string outs)588 """589 self.require_state("connected")590 outbufp = pointer(pointer(c_char()))591 outbuflen = c_long()592 outsp = pointer(pointer(c_char()))593 outslen = c_long()594 cmdarr = (c_char_p * len(cmd))(*cmd)595 if target:596 ret = run_in_thread(self.librados.rados_mon_command_target,597 (self.cluster, c_char_p(target), cmdarr,598 len(cmd), c_char_p(inbuf), len(inbuf),599 outbufp, byref(outbuflen), outsp,600 byref(outslen)), timeout)601 else:602 ret = run_in_thread(self.librados.rados_mon_command,603 (self.cluster, cmdarr, len(cmd),604 c_char_p(inbuf), len(inbuf),605 outbufp, byref(outbuflen), outsp, byref(outslen)),606 timeout)607 # copy returned memory (ctypes makes a copy, not a reference)608 my_outbuf = outbufp.contents[:(outbuflen.value)]609 my_outs = outsp.contents[:(outslen.value)]610 # free callee's allocations611 if outbuflen.value:612 run_in_thread(self.librados.rados_buffer_free, (outbufp.contents,))613 if outslen.value:614 run_in_thread(self.librados.rados_buffer_free, (outsp.contents,))615 return (ret, my_outbuf, my_outs)616 def osd_command(self, osdid, cmd, inbuf, timeout=0):617 """618 osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)619 returns (int ret, string outbuf, string outs)620 """621 self.require_state("connected")622 outbufp = pointer(pointer(c_char()))623 outbuflen = c_long()624 outsp = pointer(pointer(c_char()))625 outslen = c_long()626 cmdarr = (c_char_p * len(cmd))(*cmd)627 ret = run_in_thread(self.librados.rados_osd_command,628 (self.cluster, osdid, cmdarr, len(cmd),629 c_char_p(inbuf), len(inbuf),630 outbufp, byref(outbuflen), outsp, byref(outslen)),631 timeout)632 # copy returned memory (ctypes makes a copy, not a reference)633 my_outbuf = outbufp.contents[:(outbuflen.value)]634 my_outs = outsp.contents[:(outslen.value)]635 # free callee's allocations636 if outbuflen.value:637 run_in_thread(self.librados.rados_buffer_free, (outbufp.contents,))638 if outslen.value:639 run_in_thread(self.librados.rados_buffer_free, (outsp.contents,))640 return (ret, my_outbuf, my_outs)641 def pg_command(self, pgid, cmd, inbuf, timeout=0):642 """643 pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)644 returns (int ret, string outbuf, string outs)645 """646 self.require_state("connected")647 outbufp = pointer(pointer(c_char()))648 outbuflen = c_long()649 outsp = pointer(pointer(c_char()))650 outslen = c_long()651 cmdarr = (c_char_p * len(cmd))(*cmd)652 ret = run_in_thread(self.librados.rados_pg_command,653 (self.cluster, c_char_p(pgid), cmdarr, len(cmd),654 c_char_p(inbuf), len(inbuf),655 outbufp, byref(outbuflen), outsp, byref(outslen)),656 timeout)657 # copy returned memory (ctypes makes a copy, not a reference)658 my_outbuf = outbufp.contents[:(outbuflen.value)]659 my_outs = outsp.contents[:(outslen.value)]660 # free callee's allocations661 if outbuflen.value:662 run_in_thread(self.librados.rados_buffer_free, (outbufp.contents,))663 if outslen.value:664 run_in_thread(self.librados.rados_buffer_free, (outsp.contents,))665 return (ret, my_outbuf, my_outs)666 def wait_for_latest_osdmap(self):667 self.require_state("connected")668 return run_in_thread(self.librados.rados_wait_for_latest_osdmap, (self.cluster,))669 def blacklist_add(self, client_address, expire_seconds=0):670 """671 Blacklist a client from the OSDs672 :param client_address: client address673 :type client_address: str674 :param expire_seconds: number of seconds to blacklist675 :type expire_seconds: int676 :raises: :class:`Error`677 """678 self.require_state("connected")679 ret = run_in_thread(self.librados.rados_blacklist_add,680 (self.cluster, c_char_p(client_address),681 c_uint32(expire_seconds)))682 if ret < 0:683 raise make_ex(ret, "error blacklisting client '%s'" % client_address)684class OmapIterator(object):685 """Omap iterator"""686 def __init__(self, ioctx, ctx):687 self.ioctx = ioctx688 self.ctx = ctx689 def __iter__(self):690 return self691 def next(self):692 """693 Get the next key-value pair in the object694 :returns: next rados.OmapItem695 """696 key_ = c_char_p(0)697 val_ = c_char_p(0)698 len_ = c_int(0)699 ret = run_in_thread(self.ioctx.librados.rados_omap_get_next,700 (self.ctx, byref(key_), byref(val_), byref(len_)))701 if (ret != 0):702 raise make_ex(ret, "error iterating over the omap")703 if key_.value is None:704 raise StopIteration()705 key = ctypes.string_at(key_)706 val = None707 if val_.value is not None:708 val = ctypes.string_at(val_, len_)709 return (key, val)710 def __del__(self):711 run_in_thread(self.ioctx.librados.rados_omap_get_end, (self.ctx,))712class ObjectIterator(object):713 """rados.Ioctx Object iterator"""714 def __init__(self, ioctx):715 self.ioctx = ioctx716 self.ctx = c_void_p()717 ret = run_in_thread(self.ioctx.librados.rados_nobjects_list_open,718 (self.ioctx.io, byref(self.ctx)))719 if ret < 0:720 raise make_ex(ret, "error iterating over the objects in ioctx '%s'"721 % self.ioctx.name)722 def __iter__(self):723 return self724 def next(self):725 """726 Get the next object name and locator in the pool727 :raises: StopIteration728 :returns: next rados.Ioctx Object729 """730 key = c_char_p()731 locator = c_char_p()732 nspace = c_char_p()733 ret = run_in_thread(self.ioctx.librados.rados_nobjects_list_next,734 (self.ctx, byref(key), byref(locator), byref(nspace)))735 if ret < 0:736 raise StopIteration()737 return Object(self.ioctx, key.value, locator.value, nspace.value)738 def __del__(self):739 run_in_thread(self.ioctx.librados.rados_nobjects_list_close, (self.ctx,))740class XattrIterator(object):741 """Extended attribute iterator"""742 def __init__(self, ioctx, it, oid):743 self.ioctx = ioctx744 self.it = it745 self.oid = oid746 def __iter__(self):747 return self748 def next(self):749 """750 Get the next xattr on the object751 :raises: StopIteration752 :returns: pair - of name and value of the next Xattr753 """754 name_ = c_char_p(0)755 val_ = c_char_p(0)756 len_ = c_int(0)757 ret = run_in_thread(self.ioctx.librados.rados_getxattrs_next,758 (self.it, byref(name_), byref(val_), byref(len_)))759 if (ret != 0):760 raise make_ex(ret, "error iterating over the extended attributes \761in '%s'" % self.oid)762 if name_.value is None:763 raise StopIteration()764 name = ctypes.string_at(name_)765 val = ctypes.string_at(val_, len_)766 return (name, val)767 def __del__(self):768 run_in_thread(self.ioctx.librados.rados_getxattrs_end, (self.it,))769class SnapIterator(object):770 """Snapshot iterator"""771 def __init__(self, ioctx):772 self.ioctx = ioctx773 # We don't know how big a buffer we need until we've called the774 # function. So use the exponential doubling strategy.775 num_snaps = 10776 while True:777 self.snaps = (ctypes.c_uint64 * num_snaps)()778 ret = run_in_thread(self.ioctx.librados.rados_ioctx_snap_list,779 (self.ioctx.io, self.snaps, c_int(num_snaps)))780 if (ret >= 0):781 self.max_snap = ret782 break783 elif (ret != -errno.ERANGE):784 raise make_ex(ret, "error calling rados_snap_list for \785ioctx '%s'" % self.ioctx.name)786 num_snaps = num_snaps * 2787 self.cur_snap = 0788 def __iter__(self):789 return self790 def next(self):791 """792 Get the next Snapshot793 :raises: :class:`Error`, StopIteration794 :returns: Snap - next snapshot795 """796 if (self.cur_snap >= self.max_snap):797 raise StopIteration798 snap_id = self.snaps[self.cur_snap]799 name_len = 10800 while True:801 name = create_string_buffer(name_len)802 ret = run_in_thread(self.ioctx.librados.rados_ioctx_snap_get_name,803 (self.ioctx.io, c_uint64(snap_id), byref(name),804 c_int(name_len)))805 if (ret == 0):806 name_len = ret807 break808 elif (ret != -errno.ERANGE):809 raise make_ex(ret, "rados_snap_get_name error")810 name_len = name_len * 2811 snap = Snap(self.ioctx, name.value, snap_id)812 self.cur_snap = self.cur_snap + 1813 return snap814class Snap(object):815 """Snapshot object"""816 def __init__(self, ioctx, name, snap_id):817 self.ioctx = ioctx818 self.name = name819 self.snap_id = snap_id820 def __str__(self):821 return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \822 % (str(self.ioctx), self.name, self.snap_id)823 def get_timestamp(self):824 """825 Find when a snapshot in the current pool occurred826 :raises: :class:`Error`827 :returns: datetime - the data and time the snapshot was created828 """829 snap_time = c_long(0)830 ret = run_in_thread(self.ioctx.librados.rados_ioctx_snap_get_stamp,831 (self.ioctx.io, self.snap_id, byref(snap_time)))832 if (ret != 0):833 raise make_ex(ret, "rados_ioctx_snap_get_stamp error")834 return datetime.fromtimestamp(snap_time.value)835class Completion(object):836 """completion object"""837 def __init__(self, ioctx, rados_comp, oncomplete, onsafe,838 complete_cb, safe_cb):839 self.rados_comp = rados_comp840 self.oncomplete = oncomplete841 self.onsafe = onsafe842 self.ioctx = ioctx843 self.complete_cb = complete_cb844 self.safe_cb = safe_cb845 def is_safe(self):846 """847 Is an asynchronous operation safe?848 This does not imply that the safe callback has finished.849 :returns: True if the operation is safe850 """851 return run_in_thread(self.ioctx.librados.rados_aio_is_safe,852 (self.rados_comp,)) == 1853 def is_complete(self):854 """855 Has an asynchronous operation completed?856 This does not imply that the safe callback has finished.857 :returns: True if the operation is completed858 """859 return run_in_thread(self.ioctx.librados.rados_aio_is_complete,860 (self.rados_comp,)) == 1861 def wait_for_safe(self):862 """863 Wait for an asynchronous operation to be marked safe864 This does not imply that the safe callback has finished.865 """866 run_in_thread(self.ioctx.librados.rados_aio_wait_for_safe,867 (self.rados_comp,))868 def wait_for_complete(self):869 """870 Wait for an asynchronous operation to complete871 This does not imply that the complete callback has finished.872 """873 run_in_thread(self.ioctx.librados.rados_aio_wait_for_complete,874 (self.rados_comp,))875 def wait_for_safe_and_cb(self):876 """877 Wait for an asynchronous operation to be marked safe and for878 the safe callback to have returned879 """880 run_in_thread(self.ioctx.librados.rados_aio_wait_for_safe_and_cb,881 (self.rados_comp,))882 def wait_for_complete_and_cb(self):883 """884 Wait for an asynchronous operation to complete and for the885 complete callback to have returned886 :returns: whether the operation is completed887 """888 return run_in_thread(889 self.ioctx.librados.rados_aio_wait_for_complete_and_cb,890 (self.rados_comp,)891 )892 def get_return_value(self):893 """894 Get the return value of an asychronous operation895 The return value is set when the operation is complete or safe,896 whichever comes first.897 :returns: int - return value of the operation898 """899 return run_in_thread(self.ioctx.librados.rados_aio_get_return_value,900 (self.rados_comp,))901 def __del__(self):902 """903 Release a completion904 Call this when you no longer need the completion. It may not be905 freed immediately if the operation is not acked and committed.906 """907 run_in_thread(self.ioctx.librados.rados_aio_release,908 (self.rados_comp,))909class WriteOpCtx(object):910 """write operation context manager"""911 def __init__(self, ioctx):912 self.ioctx = ioctx913 def __enter__(self):914 self.ioctx.librados.rados_create_write_op.restype = c_void_p915 ret = run_in_thread(self.ioctx.librados.rados_create_write_op, (None,))916 self.write_op = ret917 return ret918 def __exit__(self, type, msg, traceback):919 self.ioctx.librados.rados_release_write_op.argtypes = [c_void_p]920 run_in_thread(self.ioctx.librados.rados_release_write_op, (c_void_p(self.write_op),))921class ReadOpCtx(object):922 """read operation context manager"""923 def __init__(self, ioctx):924 self.ioctx = ioctx925 def __enter__(self):926 self.ioctx.librados.rados_create_read_op.restype = c_void_p927 ret = run_in_thread(self.ioctx.librados.rados_create_read_op, (None,))928 self.read_op = ret929 return ret930 def __exit__(self, type, msg, traceback):931 self.ioctx.librados.rados_release_read_op.argtypes = [c_void_p]932 run_in_thread(self.ioctx.librados.rados_release_read_op, (c_void_p(self.read_op),))933RADOS_CB = CFUNCTYPE(c_int, c_void_p, c_void_p)934class Ioctx(object):935 """rados.Ioctx object"""936 def __init__(self, name, librados, io):937 self.name = name938 self.librados = librados939 self.io = io940 self.state = "open"941 self.locator_key = ""942 self.nspace = ""943 self.safe_cbs = {}944 self.complete_cbs = {}945 self.lock = threading.Lock()946 def __enter__(self):947 return self948 def __exit__(self, type_, value, traceback):949 self.close()950 return False951 def __del__(self):952 self.close()953 def __aio_safe_cb(self, completion, _):954 """955 Callback to onsafe() for asynchronous operations956 """957 cb = None958 with self.lock:959 cb = self.safe_cbs[completion]960 del self.safe_cbs[completion]961 cb.onsafe(cb)962 return 0963 def __aio_complete_cb(self, completion, _):964 """965 Callback to oncomplete() for asynchronous operations966 """967 cb = None968 with self.lock:969 cb = self.complete_cbs[completion]970 del self.complete_cbs[completion]971 cb.oncomplete(cb)972 return 0973 def __get_completion(self, oncomplete, onsafe):974 """975 Constructs a completion to use with asynchronous operations976 :param oncomplete: what to do when the write is safe and complete in memory977 on all replicas978 :type oncomplete: completion979 :param onsafe: what to do when the write is safe and complete on storage980 on all replicas981 :type onsafe: completion982 :raises: :class:`Error`983 :returns: completion object984 """985 completion = c_void_p(0)986 complete_cb = None987 safe_cb = None988 if oncomplete:989 complete_cb = RADOS_CB(self.__aio_complete_cb)990 if onsafe:991 safe_cb = RADOS_CB(self.__aio_safe_cb)992 ret = run_in_thread(self.librados.rados_aio_create_completion,993 (c_void_p(0), complete_cb, safe_cb,994 byref(completion)))995 if ret < 0:996 raise make_ex(ret, "error getting a completion")997 with self.lock:998 completion_obj = Completion(self, completion, oncomplete, onsafe,999 complete_cb, safe_cb)1000 if oncomplete:1001 self.complete_cbs[completion.value] = completion_obj1002 if onsafe:1003 self.safe_cbs[completion.value] = completion_obj1004 return completion_obj1005 def aio_write(self, object_name, to_write, offset=0,1006 oncomplete=None, onsafe=None):1007 """1008 Write data to an object asynchronously1009 Queues the write and returns.1010 :param object_name: name of the object1011 :type object_name: str1012 :param to_write: data to write1013 :type to_write: str1014 :param offset: byte offset in the object to begin writing at1015 :type offset: int1016 :param oncomplete: what to do when the write is safe and complete in memory1017 on all replicas1018 :type oncomplete: completion1019 :param onsafe: what to do when the write is safe and complete on storage1020 on all replicas1021 :type onsafe: completion1022 :raises: :class:`Error`1023 :returns: completion object1024 """1025 completion = self.__get_completion(oncomplete, onsafe)1026 ret = run_in_thread(self.librados.rados_aio_write,1027 (self.io, c_char_p(object_name),1028 completion.rados_comp, c_char_p(to_write),1029 c_size_t(len(to_write)), c_uint64(offset)))1030 if ret < 0:1031 raise make_ex(ret, "error writing object %s" % object_name)1032 return completion1033 def aio_write_full(self, object_name, to_write,1034 oncomplete=None, onsafe=None):1035 """1036 Asychronously write an entire object1037 The object is filled with the provided data. If the object exists,1038 it is atomically truncated and then written.1039 Queues the write and returns.1040 :param object_name: name of the object1041 :type object_name: str1042 :param to_write: data to write1043 :type to_write: str1044 :param oncomplete: what to do when the write is safe and complete in memory1045 on all replicas1046 :type oncomplete: completion1047 :param onsafe: what to do when the write is safe and complete on storage1048 on all replicas1049 :type onsafe: completion1050 :raises: :class:`Error`1051 :returns: completion object1052 """1053 completion = self.__get_completion(oncomplete, onsafe)1054 ret = run_in_thread(self.librados.rados_aio_write_full,1055 (self.io, c_char_p(object_name),1056 completion.rados_comp, c_char_p(to_write),1057 c_size_t(len(to_write))))1058 if ret < 0:1059 raise make_ex(ret, "error writing object %s" % object_name)1060 return completion1061 def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):1062 """1063 Asychronously append data to an object1064 Queues the write and returns.1065 :param object_name: name of the object1066 :type object_name: str1067 :param to_append: data to append1068 :type to_append: str1069 :param offset: byte offset in the object to begin writing at1070 :type offset: int1071 :param oncomplete: what to do when the write is safe and complete in memory1072 on all replicas1073 :type oncomplete: completion1074 :param onsafe: what to do when the write is safe and complete on storage1075 on all replicas1076 :type onsafe: completion1077 :raises: :class:`Error`1078 :returns: completion object1079 """1080 completion = self.__get_completion(oncomplete, onsafe)1081 ret = run_in_thread(self.librados.rados_aio_append,1082 (self.io, c_char_p(object_name),1083 completion.rados_comp, c_char_p(to_append),1084 c_size_t(len(to_append))))1085 if ret < 0:1086 raise make_ex(ret, "error appending to object %s" % object_name)1087 return completion1088 def aio_flush(self):1089 """1090 Block until all pending writes in an io context are safe1091 :raises: :class:`Error`1092 """1093 ret = run_in_thread(self.librados.rados_aio_flush, (self.io,))1094 if ret < 0:1095 raise make_ex(ret, "error flushing")1096 def aio_read(self, object_name, length, offset, oncomplete):1097 """1098 Asychronously read data from an object1099 oncomplete will be called with the returned read value as1100 well as the completion:1101 oncomplete(completion, data_read)1102 :param object_name: name of the object to read from1103 :type object_name: str1104 :param length: the number of bytes to read1105 :type length: int1106 :param offset: byte offset in the object to begin reading from1107 :type offset: int1108 :param oncomplete: what to do when the read is complete1109 :type oncomplete: completion1110 :raises: :class:`Error`1111 :returns: completion object1112 """1113 buf = create_string_buffer(length)1114 def oncomplete_(completion_v):1115 return_value = completion_v.get_return_value()1116 return oncomplete(completion_v,1117 ctypes.string_at(buf, return_value) if return_value >= 0 else None)1118 completion = self.__get_completion(oncomplete_, None)1119 ret = run_in_thread(self.librados.rados_aio_read,1120 (self.io, c_char_p(object_name),1121 completion.rados_comp, buf, c_size_t(length),1122 c_uint64(offset)))1123 if ret < 0:1124 raise make_ex(ret, "error reading %s" % object_name)1125 return completion1126 def aio_remove(self, object_name, oncomplete=None, onsafe=None):1127 """1128 Asychronously remove an object1129 :param object_name: name of the object to remove1130 :type object_name: str1131 :param oncomplete: what to do when the remove is safe and complete in memory1132 on all replicas1133 :type oncomplete: completion1134 :param onsafe: what to do when the remove is safe and complete on storage1135 on all replicas1136 :type onsafe: completion1137 :raises: :class:`Error`1138 :returns: completion object1139 """1140 completion = self.__get_completion(oncomplete, onsafe)1141 ret = run_in_thread(self.librados.rados_aio_remove,1142 (self.io, c_char_p(object_name),1143 completion.rados_comp))1144 if ret < 0:1145 raise make_ex(ret, "error removing %s" % object_name)1146 return completion1147 def require_ioctx_open(self):1148 """1149 Checks if the rados.Ioctx object state is 'open'1150 :raises: IoctxStateError1151 """1152 if self.state != "open":1153 raise IoctxStateError("The pool is %s" % self.state)1154 def change_auid(self, auid):1155 """1156 Attempt to change an io context's associated auid "owner."1157 Requires that you have write permission on both the current and new1158 auid.1159 :raises: :class:`Error`1160 """1161 self.require_ioctx_open()1162 ret = run_in_thread(self.librados.rados_ioctx_pool_set_auid,1163 (self.io, ctypes.c_uint64(auid)))1164 if ret < 0:1165 raise make_ex(ret, "error changing auid of '%s' to %d"1166 % (self.name, auid))1167 @requires(('loc_key', str))1168 def set_locator_key(self, loc_key):1169 """1170 Set the key for mapping objects to pgs within an io context.1171 The key is used instead of the object name to determine which1172 placement groups an object is put in. This affects all subsequent1173 operations of the io context - until a different locator key is1174 set, all objects in this io context will be placed in the same pg.1175 :param loc_key: the key to use as the object locator, or NULL to discard1176 any previously set key1177 :type loc_key: str1178 :raises: :class:`TypeError`1179 """1180 self.require_ioctx_open()1181 run_in_thread(self.librados.rados_ioctx_locator_set_key,1182 (self.io, c_char_p(loc_key)))1183 self.locator_key = loc_key1184 def get_locator_key(self):1185 """1186 Get the locator_key of context1187 :returns: locator_key1188 """1189 return self.locator_key1190 @requires(('nspace', str))1191 def set_namespace(self, nspace):1192 """1193 Set the namespace for objects within an io context.1194 The namespace in addition to the object name fully identifies1195 an object. This affects all subsequent operations of the io context1196 - until a different namespace is set, all objects in this io context1197 will be placed in the same namespace.1198 :param nspace: the namespace to use, or None/"" for the default namespace1199 :type nspace: str1200 :raises: :class:`TypeError`1201 """1202 self.require_ioctx_open()1203 if nspace is None:1204 nspace = ""1205 run_in_thread(self.librados.rados_ioctx_set_namespace,1206 (self.io, c_char_p(nspace)))1207 self.nspace = nspace1208 def get_namespace(self):1209 """1210 Get the namespace of context1211 :returns: namespace1212 """1213 return self.nspace1214 def close(self):1215 """1216 Close a rados.Ioctx object.1217 This just tells librados that you no longer need to use the io context.1218 It may not be freed immediately if there are pending asynchronous1219 requests on it, but you should not use an io context again after1220 calling this function on it.1221 """1222 if self.state == "open":1223 self.require_ioctx_open()1224 run_in_thread(self.librados.rados_ioctx_destroy, (self.io,))1225 self.state = "closed"1226 @requires(('key', str), ('data', str))1227 def write(self, key, data, offset=0):1228 """1229 Write data to an object synchronously1230 :param key: name of the object1231 :type key: str1232 :param data: data to write1233 :type data: str1234 :param offset: byte offset in the object to begin writing at1235 :type offset: int1236 :raises: :class:`TypeError`1237 :raises: :class:`LogicError`1238 :returns: int - 0 on success1239 """1240 self.require_ioctx_open()1241 length = len(data)1242 ret = run_in_thread(self.librados.rados_write,1243 (self.io, c_char_p(key), c_char_p(data),1244 c_size_t(length), c_uint64(offset)))1245 if ret == 0:1246 return ret1247 elif ret < 0:1248 raise make_ex(ret, "Ioctx.write(%s): failed to write %s"1249 % (self.name, key))1250 else:1251 raise LogicError("Ioctx.write(%s): rados_write \1252returned %d, but should return zero on success." % (self.name, ret))1253 @requires(('key', str), ('data', str))1254 def write_full(self, key, data):1255 """1256 Write an entire object synchronously.1257 The object is filled with the provided data. If the object exists,1258 it is atomically truncated and then written.1259 :param key: name of the object1260 :type key: str1261 :param data: data to write1262 :type data: str1263 :raises: :class:`TypeError`1264 :raises: :class:`Error`1265 :returns: int - 0 on success1266 """1267 self.require_ioctx_open()1268 length = len(data)1269 ret = run_in_thread(self.librados.rados_write_full,1270 (self.io, c_char_p(key), c_char_p(data),1271 c_size_t(length)))1272 if ret == 0:1273 return ret1274 elif ret < 0:1275 raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"1276 % (self.name, key))1277 else:1278 raise LogicError("Ioctx.write_full(%s): rados_write_full \1279returned %d, but should return zero on success." % (self.name, ret))1280 @requires(('key', str), ('data', str))1281 def append(self, key, data):1282 """1283 Append data to an object synchronously1284 :param key: name of the object1285 :type key: str1286 :param data: data to write1287 :type data: str1288 :raises: :class:`TypeError`1289 :raises: :class:`LogicError`1290 :returns: int - 0 on success1291 """1292 self.require_ioctx_open()1293 length = len(data)1294 ret = run_in_thread(self.librados.rados_append,1295 (self.io, c_char_p(key), c_char_p(data),1296 c_size_t(length)))1297 if ret == 0:1298 return ret1299 elif ret < 0:1300 raise make_ex(ret, "Ioctx.append(%s): failed to append %s"1301 % (self.name, key))1302 else:1303 raise LogicError("Ioctx.append(%s): rados_append \1304returned %d, but should return zero on success." % (self.name, ret))1305 @requires(('key', str))1306 def read(self, key, length=8192, offset=0):1307 """1308 Read data from an object synchronously1309 :param key: name of the object1310 :type key: str1311 :param length: the number of bytes to read (default=8192)1312 :type length: int1313 :param offset: byte offset in the object to begin reading at1314 :type offset: int1315 :raises: :class:`TypeError`1316 :raises: :class:`Error`1317 :returns: str - data read from object1318 """1319 self.require_ioctx_open()1320 ret_buf = create_string_buffer(length)1321 ret = run_in_thread(self.librados.rados_read,1322 (self.io, c_char_p(key), ret_buf, c_size_t(length),1323 c_uint64(offset)))1324 if ret < 0:1325 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))1326 return ctypes.string_at(ret_buf, ret)1327 def get_stats(self):1328 """1329 Get pool usage statistics1330 :returns: dict - contains the following keys:1331 - ``num_bytes`` (int) - size of pool in bytes1332 - ``num_kb`` (int) - size of pool in kbytes1333 - ``num_objects`` (int) - number of objects in the pool1334 - ``num_object_clones`` (int) - number of object clones1335 - ``num_object_copies`` (int) - number of object copies1336 - ``num_objects_missing_on_primary`` (int) - number of objets1337 missing on primary1338 - ``num_objects_unfound`` (int) - number of unfound objects1339 - ``num_objects_degraded`` (int) - number of degraded objects1340 - ``num_rd`` (int) - bytes read1341 - ``num_rd_kb`` (int) - kbytes read1342 - ``num_wr`` (int) - bytes written1343 - ``num_wr_kb`` (int) - kbytes written1344 """1345 self.require_ioctx_open()1346 stats = rados_pool_stat_t()1347 ret = run_in_thread(self.librados.rados_ioctx_pool_stat,1348 (self.io, byref(stats)))1349 if ret < 0:1350 raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)1351 return {'num_bytes': stats.num_bytes,1352 'num_kb': stats.num_kb,1353 'num_objects': stats.num_objects,1354 'num_object_clones': stats.num_object_clones,1355 'num_object_copies': stats.num_object_copies,1356 "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,1357 "num_objects_unfound": stats.num_objects_unfound,1358 "num_objects_degraded": stats.num_objects_degraded,1359 "num_rd": stats.num_rd,1360 "num_rd_kb": stats.num_rd_kb,1361 "num_wr": stats.num_wr,1362 "num_wr_kb": stats.num_wr_kb}1363 @requires(('key', str))1364 def remove_object(self, key):1365 """1366 Delete an object1367 This does not delete any snapshots of the object.1368 :param key: the name of the object to delete1369 :type key: str1370 :raises: :class:`TypeError`1371 :raises: :class:`Error`1372 :returns: bool - True on success1373 """1374 self.require_ioctx_open()1375 ret = run_in_thread(self.librados.rados_remove,1376 (self.io, c_char_p(key)))1377 if ret < 0:1378 raise make_ex(ret, "Failed to remove '%s'" % key)1379 return True1380 @requires(('key', str))1381 def trunc(self, key, size):1382 """1383 Resize an object1384 If this enlarges the object, the new area is logically filled with1385 zeroes. If this shrinks the object, the excess data is removed.1386 :param key: the name of the object to resize1387 :type key: str1388 :param size: the new size of the object in bytes1389 :type size: int1390 :raises: :class:`TypeError`1391 :raises: :class:`Error`1392 :returns: int - 0 on success, otherwise raises error1393 """1394 self.require_ioctx_open()1395 ret = run_in_thread(self.librados.rados_trunc,1396 (self.io, c_char_p(key), c_uint64(size)))1397 if ret < 0:1398 raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))1399 return ret1400 @requires(('key', str))1401 def stat(self, key):1402 """1403 Get object stats (size/mtime)1404 :param key: the name of the object to get stats from1405 :type key: str1406 :raises: :class:`TypeError`1407 :raises: :class:`Error`1408 :returns: (size,timestamp)1409 """1410 self.require_ioctx_open()1411 psize = c_uint64()1412 pmtime = c_uint64()1413 ret = run_in_thread(self.librados.rados_stat,1414 (self.io, c_char_p(key), pointer(psize),1415 pointer(pmtime)))1416 if ret < 0:1417 raise make_ex(ret, "Failed to stat %r" % key)1418 return psize.value, time.localtime(pmtime.value)1419 @requires(('key', str), ('xattr_name', str))1420 def get_xattr(self, key, xattr_name):1421 """1422 Get the value of an extended attribute on an object.1423 :param key: the name of the object to get xattr from1424 :type key: str1425 :param xattr_name: which extended attribute to read1426 :type xattr_name: str1427 :raises: :class:`TypeError`1428 :raises: :class:`Error`1429 :returns: str - value of the xattr1430 """1431 self.require_ioctx_open()1432 ret_length = 40961433 while ret_length < 4096 * 1024 * 1024:1434 ret_buf = create_string_buffer(ret_length)1435 ret = run_in_thread(self.librados.rados_getxattr,1436 (self.io, c_char_p(key), c_char_p(xattr_name),1437 ret_buf, c_size_t(ret_length)))1438 if (ret == -errno.ERANGE):1439 ret_length *= 21440 elif ret < 0:1441 raise make_ex(ret, "Failed to get xattr %r" % xattr_name)1442 else:1443 break1444 return ctypes.string_at(ret_buf, ret)1445 @requires(('oid', str))1446 def get_xattrs(self, oid):1447 """1448 Start iterating over xattrs on an object.1449 :param oid: the name of the object to get xattrs from1450 :type key: str1451 :raises: :class:`TypeError`1452 :raises: :class:`Error`1453 :returns: XattrIterator1454 """1455 self.require_ioctx_open()1456 it = c_void_p(0)1457 ret = run_in_thread(self.librados.rados_getxattrs,1458 (self.io, oid, byref(it)))1459 if ret != 0:1460 raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)1461 return XattrIterator(self, it, oid)1462 @requires(('key', str), ('xattr_name', str), ('xattr_value', str))1463 def set_xattr(self, key, xattr_name, xattr_value):1464 """1465 Set an extended attribute on an object.1466 :param key: the name of the object to set xattr to1467 :type key: str1468 :param xattr_name: which extended attribute to set1469 :type xattr_name: str1470 :param xattr_value: the value of the extended attribute1471 :type xattr_value: str1472 :raises: :class:`TypeError`1473 :raises: :class:`Error`1474 :returns: bool - True on success, otherwise raise an error1475 """1476 self.require_ioctx_open()1477 ret = run_in_thread(self.librados.rados_setxattr,1478 (self.io, c_char_p(key), c_char_p(xattr_name),1479 c_char_p(xattr_value), c_size_t(len(xattr_value))))1480 if ret < 0:1481 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)1482 return True1483 @requires(('key', str), ('xattr_name', str))1484 def rm_xattr(self, key, xattr_name):1485 """1486 Removes an extended attribute on from an object.1487 :param key: the name of the object to remove xattr from1488 :type key: str1489 :param xattr_name: which extended attribute to remove1490 :type xattr_name: str1491 :raises: :class:`TypeError`1492 :raises: :class:`Error`1493 :returns: bool - True on success, otherwise raise an error1494 """1495 self.require_ioctx_open()1496 ret = run_in_thread(self.librados.rados_rmxattr,1497 (self.io, c_char_p(key), c_char_p(xattr_name)))1498 if ret < 0:1499 raise make_ex(ret, "Failed to delete key %r xattr %r" %1500 (key, xattr_name))1501 return True1502 def list_objects(self):1503 """1504 Get ObjectIterator on rados.Ioctx object.1505 :returns: ObjectIterator1506 """1507 self.require_ioctx_open()1508 return ObjectIterator(self)1509 def list_snaps(self):1510 """1511 Get SnapIterator on rados.Ioctx object.1512 :returns: SnapIterator1513 """1514 self.require_ioctx_open()1515 return SnapIterator(self)1516 @requires(('snap_name', str))1517 def create_snap(self, snap_name):1518 """1519 Create a pool-wide snapshot1520 :param snap_name: the name of the snapshot1521 :type snap_name: str1522 :raises: :class:`TypeError`1523 :raises: :class:`Error`1524 """1525 self.require_ioctx_open()1526 ret = run_in_thread(self.librados.rados_ioctx_snap_create,1527 (self.io, c_char_p(snap_name)))1528 if (ret != 0):1529 raise make_ex(ret, "Failed to create snap %s" % snap_name)1530 @requires(('snap_name', str))1531 def remove_snap(self, snap_name):1532 """1533 Removes a pool-wide snapshot1534 :param snap_name: the name of the snapshot1535 :type snap_name: str1536 :raises: :class:`TypeError`1537 :raises: :class:`Error`1538 """1539 self.require_ioctx_open()1540 ret = run_in_thread(self.librados.rados_ioctx_snap_remove,1541 (self.io, c_char_p(snap_name)))1542 if (ret != 0):1543 raise make_ex(ret, "Failed to remove snap %s" % snap_name)1544 @requires(('snap_name', str))1545 def lookup_snap(self, snap_name):1546 """1547 Get the id of a pool snapshot1548 :param snap_name: the name of the snapshot to lookop1549 :type snap_name: str1550 :raises: :class:`TypeError`1551 :raises: :class:`Error`1552 :returns: Snap - on success1553 """1554 self.require_ioctx_open()1555 snap_id = c_uint64()1556 ret = run_in_thread(self.librados.rados_ioctx_snap_lookup,1557 (self.io, c_char_p(snap_name), byref(snap_id)))1558 if (ret != 0):1559 raise make_ex(ret, "Failed to lookup snap %s" % snap_name)1560 return Snap(self, snap_name, snap_id)1561 def get_last_version(self):1562 """1563 Return the version of the last object read or written to.1564 This exposes the internal version number of the last object read or1565 written via this io context1566 :returns: version of the last object used1567 """1568 self.require_ioctx_open()1569 return run_in_thread(self.librados.rados_get_last_version, (self.io,))1570 def create_write_op(self):1571 """1572 create write operation object.1573 need call release_write_op after use1574 """1575 self.librados.rados_create_write_op.restype = c_void_p1576 return run_in_thread(self.librados.rados_create_write_op, (None,))1577 def create_read_op(self):1578 """1579 create read operation object.1580 need call release_read_op after use1581 """1582 self.librados.rados_create_read_op.restype = c_void_p1583 return run_in_thread(self.librados.rados_create_read_op, (None,))1584 def release_write_op(self, write_op):1585 """1586 release memory alloc by create_write_op1587 """1588 self.librados.rados_release_write_op.argtypes = [c_void_p]1589 run_in_thread(self.librados.rados_release_write_op, (c_void_p(write_op),))1590 def release_read_op(self, read_op):1591 """1592 release memory alloc by create_read_op1593 :para read_op: read_op object1594 :type: int1595 """1596 self.librados.rados_release_read_op.argtypes = [c_void_p]1597 run_in_thread(self.librados.rados_release_read_op, (c_void_p(read_op),))1598 @requires(('write_op', int), ('keys', tuple), ('values', tuple))1599 def set_omap(self, write_op, keys, values):1600 """1601 set keys values to write_op1602 :para write_op: write_operation object1603 :type write_op: int1604 :para keys: a tuple of keys1605 :type keys: tuple1606 :para values: a tuple of values1607 :type values: tuple1608 """1609 if len(keys) != len(values):1610 raise Error("Rados(): keys and values must have the same number of items")1611 key_num = len(keys)1612 key_array_type = c_char_p*key_num1613 key_array = key_array_type()1614 key_array[:] = keys1615 value_array_type = c_char_p*key_num1616 value_array = value_array_type()1617 value_array[:] = values1618 lens_array_type = c_size_t*key_num1619 lens_array = lens_array_type()1620 for index, value in enumerate(values):1621 lens_array[index] = c_size_t(len(value))1622 run_in_thread(self.librados.rados_write_op_omap_set,1623 (c_void_p(write_op), byref(key_array), byref(value_array),1624 byref(lens_array), c_int(key_num),))1625 @requires(('write_op', int), ('oid', str), ('mtime', opt(int)), ('flags', opt(int)))1626 def operate_write_op(self, write_op, oid, mtime=0, flags=0):1627 """1628 excute the real write operation1629 :para write_op: write operation object1630 :type write_op: int1631 :para oid: object name1632 :type oid: str1633 :para mtime: the time to set the mtime to, 0 for the current time1634 :type mtime: int1635 :para flags: flags to apply to the entire operation1636 :type flags: int1637 """1638 run_in_thread(self.librados.rados_write_op_operate,1639 (c_void_p(write_op), self.io, c_char_p(oid),1640 c_long(mtime), c_int(flags),))1641 @requires(('read_op', int), ('oid', str), ('flag', opt(int)))1642 def operate_read_op(self, read_op, oid, flag=0):1643 """1644 excute the real read operation1645 :para read_op: read operation object1646 :type read_op: int1647 :para oid: object name1648 :type oid: str1649 :para flag: flags to apply to the entire operation1650 :type flag: int1651 """1652 run_in_thread(self.librados.rados_read_op_operate,1653 (c_void_p(read_op), self.io, c_char_p(oid), c_int(flag),))1654 @requires(('read_op', int), ('start_after', str), ('filter_prefix', str), ('max_return', int))1655 def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):1656 """1657 get the omap values1658 :para read_op: read operation object1659 :type read_op: int1660 :para start_after: list keys starting after start_after1661 :type start_after: str1662 :para filter_prefix: list only keys beginning with filter_prefix1663 :type filter_prefix: str1664 :para max_return: list no more than max_return key/value pairs1665 :type max_return: int1666 :returns: an iterator over the the requested omap values, return value from this action1667 """1668 prval = c_int()1669 iter_addr = c_void_p()1670 run_in_thread(self.librados.rados_read_op_omap_get_vals,1671 (c_void_p(read_op), c_char_p(start_after),1672 c_char_p(filter_prefix), c_int(max_return),1673 byref(iter_addr), pointer(prval)))1674 return OmapIterator(self, iter_addr), prval.value1675 @requires(('read_op', int), ('start_after', str), ('max_return', int))1676 def get_omap_keys(self, read_op, start_after, max_return):1677 """1678 get the omap keys1679 :para read_op: read operation object1680 :type read_op: int1681 :para start_after: list keys starting after start_after1682 :type start_after: str1683 :para max_return: list no more than max_return key/value pairs1684 :type max_return: int1685 :returns: an iterator over the the requested omap values, return value from this action1686 """1687 prval = c_int()1688 iter_addr = c_void_p()1689 run_in_thread(self.librados.rados_read_op_omap_get_keys,1690 (c_void_p(read_op), c_char_p(start_after),1691 c_int(max_return), byref(iter_addr), pointer(prval)))1692 return OmapIterator(self, iter_addr), prval.value1693 @requires(('read_op', int), ('keys', tuple))1694 def get_omap_vals_by_keys(self, read_op, keys):1695 """1696 get the omap values by keys1697 :para read_op: read operation object1698 :type read_op: int1699 :para keys: input key tuple1700 :type keys: tuple1701 :returns: an iterator over the the requested omap values, return value from this action1702 """1703 prval = c_int()1704 iter_addr = c_void_p()1705 key_num = len(keys)1706 key_array_type = c_char_p*key_num1707 key_array = key_array_type()1708 key_array[:] = keys1709 run_in_thread(self.librados.rados_read_op_omap_get_vals_by_keys,1710 (c_void_p(read_op), byref(key_array), c_int(key_num),1711 byref(iter_addr), pointer(prval)))1712 return OmapIterator(self, iter_addr), prval.value1713 @requires(('write_op', int), ('keys', tuple))1714 def remove_omap_keys(self, write_op, keys):1715 """1716 remove omap keys specifiled1717 :para write_op: write operation object1718 :type write_op: int1719 :para keys: input key tuple1720 :type keys: tuple1721 """1722 key_num = len(keys)1723 key_array_type = c_char_p*key_num1724 key_array = key_array_type()1725 key_array[:] = keys1726 run_in_thread(self.librados.rados_write_op_omap_rm_keys,1727 (c_void_p(write_op), byref(key_array), c_int(key_num)))1728 @requires(('write_op', int))1729 def clear_omap(self, write_op):1730 """1731 Remove all key/value pairs from an object1732 :para write_op: write operation object1733 :type write_op: int1734 """1735 run_in_thread(self.librados.rados_write_op_omap_clear,1736 (c_void_p(write_op),))1737 @requires(('key', str), ('name', str), ('cookie', str), ('desc', str),1738 ('duration', opt(int)), ('flags', int))1739 def lock_exclusive(self, key, name, cookie, desc="", duration=None, flags=0):1740 """1741 Take an exclusive lock on an object1742 :param key: name of the object1743 :type key: str1744 :param name: name of the lock1745 :type name: str1746 :param cookie: cookie of the lock1747 :type cookie: str1748 :param desc: description of the lock1749 :type desc: str1750 :param duration: duration of the lock in seconds1751 :type duration: int1752 :param flags: flags1753 :type flags: int1754 :raises: :class:`TypeError`1755 :raises: :class:`Error`1756 """1757 self.require_ioctx_open()1758 ret = run_in_thread(self.librados.rados_lock_exclusive,1759 (self.io, c_char_p(key), c_char_p(name), c_char_p(cookie),1760 c_char_p(desc),1761 timeval(duration, None) if duration is None else None,1762 c_uint8(flags)))1763 if ret < 0:1764 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))1765 @requires(('key', str), ('name', str), ('cookie', str), ('tag', str),1766 ('desc', str), ('duration', opt(int)), ('flags', int))1767 def lock_shared(self, key, name, cookie, tag, desc="", duration=None, flags=0):1768 """1769 Take a shared lock on an object1770 :param key: name of the object1771 :type key: str1772 :param name: name of the lock1773 :type name: str1774 :param cookie: cookie of the lock1775 :type cookie: str1776 :param tag: tag of the lock1777 :type tag: str1778 :param desc: description of the lock1779 :type desc: str1780 :param duration: duration of the lock in seconds1781 :type duration: int1782 :param flags: flags1783 :type flags: int1784 :raises: :class:`TypeError`1785 :raises: :class:`Error`1786 """1787 self.require_ioctx_open()1788 ret = run_in_thread(self.librados.rados_lock_shared,1789 (self.io, c_char_p(key), c_char_p(name), c_char_p(cookie),1790 c_char_p(tag), c_char_p(desc),1791 timeval(duration, None) if duration is None else None,1792 c_uint8(flags)))1793 if ret < 0:1794 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))1795 @requires(('key', str), ('name', str), ('cookie', str))1796 def unlock(self, key, name, cookie):1797 """1798 Release a shared or exclusive lock on an object1799 :param key: name of the object1800 :type key: str1801 :param name: name of the lock1802 :type name: str1803 :param cookie: cookie of the lock1804 :type cookie: str1805 :raises: :class:`TypeError`1806 :raises: :class:`Error`1807 """1808 self.require_ioctx_open()1809 ret = run_in_thread(self.librados.rados_unlock,1810 (self.io, c_char_p(key), c_char_p(name), c_char_p(cookie)))1811 if ret < 0:1812 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))1813def set_object_locator(func):1814 def retfunc(self, *args, **kwargs):1815 if self.locator_key is not None:1816 old_locator = self.ioctx.get_locator_key()1817 self.ioctx.set_locator_key(self.locator_key)1818 retval = func(self, *args, **kwargs)1819 self.ioctx.set_locator_key(old_locator)1820 return retval1821 else:1822 return func(self, *args, **kwargs)1823 return retfunc1824def set_object_namespace(func):1825 def retfunc(self, *args, **kwargs):1826 if self.nspace is None:1827 raise LogicError("Namespace not set properly in context")1828 old_nspace = self.ioctx.get_namespace()1829 self.ioctx.set_namespace(self.nspace)1830 retval = func(self, *args, **kwargs)1831 self.ioctx.set_namespace(old_nspace)1832 return retval1833 return retfunc1834class Object(object):1835 """Rados object wrapper, makes the object look like a file"""1836 def __init__(self, ioctx, key, locator_key=None, nspace=None):1837 self.key = key1838 self.ioctx = ioctx1839 self.offset = 01840 self.state = "exists"1841 self.locator_key = locator_key1842 self.nspace = "" if nspace is None else nspace1843 def __str__(self):1844 return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \1845 (str(self.ioctx), self.key, "--default--"1846 if self.nspace is "" else self.nspace, self.locator_key)1847 def require_object_exists(self):1848 if self.state != "exists":1849 raise ObjectStateError("The object is %s" % self.state)1850 @set_object_locator1851 @set_object_namespace1852 def read(self, length=1024 * 1024):1853 self.require_object_exists()1854 ret = self.ioctx.read(self.key, length, self.offset)1855 self.offset += len(ret)1856 return ret1857 @set_object_locator1858 @set_object_namespace1859 def write(self, string_to_write):1860 self.require_object_exists()1861 ret = self.ioctx.write(self.key, string_to_write, self.offset)1862 if ret == 0:1863 self.offset += len(string_to_write)1864 return ret1865 @set_object_locator1866 @set_object_namespace1867 def remove(self):1868 self.require_object_exists()1869 self.ioctx.remove_object(self.key)1870 self.state = "removed"1871 @set_object_locator1872 @set_object_namespace1873 def stat(self):1874 self.require_object_exists()1875 return self.ioctx.stat(self.key)1876 def seek(self, position):1877 self.require_object_exists()1878 self.offset = position1879 @set_object_locator1880 @set_object_namespace1881 def get_xattr(self, xattr_name):1882 self.require_object_exists()1883 return self.ioctx.get_xattr(self.key, xattr_name)1884 @set_object_locator1885 @set_object_namespace1886 def get_xattrs(self):1887 self.require_object_exists()1888 return self.ioctx.get_xattrs(self.key)1889 @set_object_locator1890 @set_object_namespace1891 def set_xattr(self, xattr_name, xattr_value):1892 self.require_object_exists()1893 return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)1894 @set_object_locator1895 @set_object_namespace1896 def rm_xattr(self, xattr_name):1897 self.require_object_exists()1898 return self.ioctx.rm_xattr(self.key, xattr_name)1899MONITOR_LEVELS = [1900 "debug",1901 "info",1902 "warn", "warning",1903 "err", "error",1904 "sec",1905 ]1906class MonitorLog(object):1907 """1908 For watching cluster log messages. Instantiate an object and keep1909 it around while callback is periodically called. Construct with1910 'level' to monitor 'level' messages (one of MONITOR_LEVELS).1911 arg will be passed to the callback.1912 callback will be called with:1913 arg (given to __init__)1914 line (the full line, including timestamp, who, level, msg)1915 who (which entity issued the log message)1916 timestamp_sec (sec of a struct timespec)1917 timestamp_nsec (sec of a struct timespec)1918 seq (sequence number)1919 level (string representing the level of the log message)1920 msg (the message itself)1921 callback's return value is ignored1922 """1923 def monitor_log_callback(self, arg, line, who, sec, nsec, seq, level, msg):1924 """1925 Local callback wrapper, in case we decide to do something1926 """1927 self.callback(arg, line, who, sec, nsec, seq, level, msg)1928 return 01929 def __init__(self, cluster, level, callback, arg):1930 if level not in MONITOR_LEVELS:1931 raise LogicError("invalid monitor level " + level)1932 if not callable(callback):1933 raise LogicError("callback must be a callable function")1934 self.level = level1935 self.callback = callback1936 self.arg = arg1937 callback_factory = CFUNCTYPE(c_int, # return type (really void)1938 c_void_p, # arg1939 c_char_p, # line1940 c_char_p, # who1941 c_uint64, # timestamp_sec1942 c_uint64, # timestamp_nsec1943 c_ulong, # seq1944 c_char_p, # level1945 c_char_p) # msg1946 self.internal_callback = callback_factory(self.monitor_log_callback)1947 r = run_in_thread(cluster.librados.rados_monitor_log,1948 (cluster.cluster, level, self.internal_callback, arg))1949 if r:...

Full Screen

Full Screen

test_all.py

Source:test_all.py Github

copy

Full Screen

...4import pydron5import logging6from twisted.internet import threads7logging.basicConfig(level=logging.DEBUG)8def run_in_thread(func):9 def wrapper(*args, **kwargs):10 return threads.deferToThread(func, *args, **kwargs)11 return wrapper12class IntegrationTests(unittest.TestCase):13 14 @utwist.with_reactor15 @run_in_thread16 def test_default_return_value(self):17 @pydron.schedule18 def target():19 pass20 self.assertIsNone(target())21 22 ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tox automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful