How to use _recover_tasks method in autotest

Best Python code snippet using autotest_python Github


Full Screen

1# Local server2from __future__ import annotations3import io4import os5import signal6import time7import anyio8from import SocketAttribute9from moat.util import DelayedRead, DelayedWrite, create_queue10try:11 from contextlib import asynccontextmanager12except ImportError:13 from async_generator import asynccontextmanager14import logging15from import Mapping16from functools import partial17from pprint import pformat18from typing import Any, Dict19from asyncactor import (20 Actor,21 DetagEvent,22 GoodNodeEvent,23 RawMsgEvent,24 RecoverEvent,25 TagEvent,26 UntagEvent,27)28from asyncactor.backend import get_transport29from moat.util import (30 MsgReader,31 MsgWriter,32 NotGiven,33 P,34 Path,35 PathLongener,36 PathShortener,37 ValueEvent,38 attrdict,39 byte2num,40 combine_dict,41 drop_dict,42 gen_ssl,43 num2byte,44 run_tcp_server,45)46from range_set import RangeSet47from . import _version_tuple48from . import client as distkv_client # needs to be mock-able49from .actor.deletor import DeleteActor50from .backend import get_backend51from .codec import packer, stream_unpacker, unpacker52from .default import CFG53from .exceptions import (54 ACLError,55 CancelledError,56 ClientChainError,57 ClientError,58 NoAuthError,59 ServerClosedError,60 ServerConnectionError,61 ServerError,62)63from .model import Node, NodeEvent, NodeSet, UpdateEvent, Watcher64from .types import ACLFinder, ACLStepper, ConvNull, NullACL, RootEntry65Stream = = anyio.ClosedResourceError67ExceptionGroup = anyio.ExceptionGroup68_client_nr = 069SERF_MAXLEN = 45070SERF_LEN_DELTA = 1571def max_n(a, b):72 if a is None:73 return b74 elif b is None:75 return a76 elif a < b:77 return b78 else:79 return a80def cmp_n(a, b):81 if a is None:82 a = -183 if b is None:84 b = -185 return b - a86class HelloProc:87 """88 A hacked-up command processor for receiving the first client message.89 """90 def __init__(self, client):91 self.client = client92 async def received(self, msg):93 qlen = msg.get("qlen", 0)94 self.client.qlen = min(qlen, self.client.server.cfg.server.buffer)95 del self.client.in_stream[0]96 async def aclose(self):97 self.client.in_stream.pop(0, None)98class StreamCommand:99 """Represent the execution of a streamed command.100 Implement the actual command by overriding ``run``.101 Read the next input message by calling ``recv``.102 This auto-detects whether the client sends multiple lines, by closing103 the incoming channel if there's no state=start in the command.104 Selection of outgoing multiline-or-not must be done beforehand,105 by setting ``.multiline``: either statically in a subclass, or106 overriding ``__call__``.107 """108 multiline = False109 send_q = None110 _scope = None111 end_msg = None112 qr = None113 dw = None114 def __new__(cls, client, msg):115 if cls is StreamCommand:116 cls = globals()["SCmd_" + msg.action] # pylint: disable=self-cls-assignment117 return cls(client, msg)118 else:119 return object.__new__(cls)120 def __init__(self, client, msg):121 self.client = client122 self.msg = msg123 self.seq = msg.seq124 self.client.in_stream[self.seq] = self125 self.qlen = self.client.qlen126 if self.qlen:127 self.qr = DelayedRead(self.qlen, get_seq=self._get_seq, send_ack=self._send_ack)128 self.dw = DelayedWrite(self.qlen)129 else:130 self.qr = create_queue(1)131 @staticmethod132 def _get_seq(msg):133 return msg.get("wseq", 0)134 async def _send_ack(self, seq):135 await self.client.send(seq=self.seq, state="ack", ack=seq)136 async def received(self, msg):137 """Receive another message from the client"""138 s = msg.get("state", "")139 if s == "ack":140 if self.dw is not None:141 await self.dw.recv_ack(msg["ack"])142 return143 err = msg.get("error", None)144 if err:145 await self.qr.put(msg)146 if s == "end":147 self.end_msg = msg148 await self.aclose()149 elif not err:150 await self.qr.put(msg)151 async def aclose(self):152 self.client.in_stream.pop(self.seq, None)153 await self.qr.close_sender()154 async def recv(self):155 msg = await self.qr.get()156 if "error" in msg:157 raise ClientError(msg.error)158 return msg159 async def send(self, **msg):160 """Send a message to the client."""161 msg["seq"] = self.seq162 if not self.multiline:163 if self.multiline is None:164 raise RuntimeError("Non-Multiline tried to send twice")165 self.multiline = None166 elif self.multiline == -1:167 raise RuntimeError("Can't explicitly send in simple interaction")168 try:169 if self.dw is not None:170 msg["wseq"] = await self.dw.next_seq()171 await self.client.send(msg)172 except ClosedResourceError:173"OERR %d", self.client._client_nr)174 async def __call__(self, **kw):175 msg = self.msg176 if msg.get("state") != "start":177 # single message178 await self.qr.close_sender()179 qlen = msg.get("qlen", 0)180 if qlen > 0:181 self.dw = DelayedWrite(qlen)182 if self.multiline > 0:183 await self.send(state="start")184 try:185 res = await**kw)186 if res is not None:187 await self.send(**res)188 except Exception as exc:189 if not isinstance(exc, CancelledError):190 self.client.logger.exception("ERS%d %r", self.client._client_nr, self.msg)191 await self.send(error=repr(exc))192 finally:193 with anyio.move_on_after(2, shield=True):194 try:195 await self.send(state="end")196 except anyio.BrokenResourceError:197 pass198 else:199 res = await**kw)200 if res is None:201 if self.multiline is None:202 return203 res = {}204 if self.multiline is None:205 raise RuntimeError("Can't explicitly send in single-line reply")206 if self.multiline < 0:207 return res208 res["seq"] = self.seq209 await self.send(**res)210 async def run(self):211 raise RuntimeError("Do implement me!")212class SingleMixin:213 """This is a mix-in that transforms a StreamCommand into something that214 doesn't."""215 multiline = -1216 async def __call__(self, **kw):217 await self.aclose()218 return await super().__call__(**kw)219class SCmd_auth(StreamCommand):220 """221 Perform user authorization.222 root: sub-root directory223 typ: auth method (root)224 ident: user identifier (*)225 plus any other data the client-side auth object sends226 This call cannot be used to re-authenticate. The code will go227 through the motions but not actually do anything, thus you can228 non-destructively test an updated authorization.229 """230 multiline = True231 noAuth = True232 async def run(self):233 from .auth import loader234 msg = self.msg235 client = self.client236 if client._user is not None:237 await client._user.auth_sub(msg)238 return239 root = msg.get("root", Path())240 auth = client.root.follow(root + (None, "auth"), nulls_ok=2, create=False)241 if client.user is None:242 a =["current"]243 if msg.typ != a and client.user is None:244 raise RuntimeError("Wrong auth type", a)245 data = auth.follow(Path(msg.typ, "user", msg.ident), create=False)246 cls = loader(msg.typ, "user", server=True)247 user = cls.load(data)248 client._user = user249 try:250 await user.auth(self, msg)251 if client.user is None:252 client._chroot(root)253 client.user = user254 client.conv = user.aux_conv(data, client.root)255 client.acl = user.aux_acl(data, client.root)256 finally:257 client._user = None258class SCmd_auth_list(StreamCommand):259 """260 List auth data.261 root: sub-root directory262 typ: auth method (root)263 kind: type of data to read('user')264 ident: user identifier (foo) (if missing: return all)265 """266 multiline = True267 async def send_one(self, data, nchain=-1):268 from .auth import loader269 typ, kind, ident = data.path[-3:]270 cls = loader(typ, kind, server=True, make=False)271 user = cls.load(data)272 res = res["typ"] = typ274 res["kind"] = kind275 res["ident"] = ident276 if data.chain is not None and nchain != 0:277 res["chain"] = data.chain.serialize(nchain=nchain)278 await self.send(**res)279 async def run(self):280 msg = self.msg281 client = self.client282 if not client.user.can_auth_read:283 raise RuntimeError("Not allowed")284 nchain = msg.get("nchain", 0)285 root = msg.get("root", ())286 if root and not self.client.user.is_super_root:287 raise RuntimeError("Cannot read tenant users")288 kind = msg.get("kind", "user")289 auth = client.root.follow(root + (None, "auth"), nulls_ok=2, create=False)290 if "ident" in msg:291 data = auth.follow(Path(msg.typ, kind, msg.ident), create=False)292 await self.send_one(data, nchain=nchain)293 else:294 d = auth.follow(Path(msg.typ, kind), create=False)295 for data in d.values():296 await self.send_one(data, nchain=nchain)297 async def __call__(self, **kw):298 # simplify for single-value result299 msg = self.msg300 self.multiline = "ident" not in msg301 return await super().__call__(**kw)302class SCmd_auth_get(StreamCommand):303 """304 Read auth data.305 root: sub-root directory306 typ: auth method (root)307 kind: type of data to read('user')308 ident: user identifier (foo)309 chain: change history310 plus any other data the client-side manager object sends311 """312 multiline = False313 async def run(self):314 from .auth import loader315 msg = self.msg316 client = self.client317 if not client.user.can_auth_read:318 raise RuntimeError("Not allowed")319 root = msg.get("root", ())320 if root and not self.client.user.is_super_root:321 raise RuntimeError("Cannot read tenant users")322 kind = msg.get("kind", "user")323 auth = client.root.follow(root + (None, "auth"), nulls_ok=2, create=False)324 data = auth.follow(Path(msg.typ, kind, msg.ident), create=False)325 cls = loader(msg.typ, kind, server=True, make=False)326 user = cls.load(data)327 res = nchain = msg.get("nchain", 0)329 if nchain:330 res["chain"] = data.chain.serialize(nchain=nchain)331 return res332class SCmd_auth_set(StreamCommand):333 """334 Write auth data.335 root: sub-root directory336 typ: auth method (root)337 kind: type of data to read('user')338 ident: user identifier (foo)339 chain: change history340 plus any other data the client sends341 """342 multiline = True343 async def run(self):344 from .auth import loader345 msg = self.msg346 client = self.client347 if not client.user.can_auth_write:348 raise RuntimeError("Not allowed")349 root = msg.get("root", ())350 if root and not self.client.user.is_super_root:351 raise RuntimeError("Cannot write tenant users")352 kind = msg.get("kind", "user")353 cls = loader(msg.typ, kind, server=True, make=True)354 auth = client.root.follow(root + (None, "auth"), nulls_ok=2, create=True)355 data = auth.follow(Path(msg.typ, kind, msg.ident), create=True)356 user = cls.load(data)357 val = val = drop_dict(val, msg.pop("drop", ()))359 val = combine_dict(msg, val)360 user = await cls.recv(self, val)361 msg.value = msg.path = (*root, None, "auth", msg.typ, kind, user.ident)363 return await client.cmd_set_value(msg, _nulls_ok=True)364class SCmd_get_tree(StreamCommand):365 """366 Get a subtree.367 path: position to start to enumerate.368 min_depth: tree depth at which to start returning results. Default 0=path location.369 max_depth: tree depth at which to not go deeper. Default +inf=everything.370 nchain: number of change chain entries to return. Default 0=don't send chain data.371 The returned data is PathShortened.372 """373 multiline = True374 async def run(self, root=None): # pylint: disable=arguments-differ375 msg = self.msg376 client = self.client377 if root is None:378 root = client.root379 entry, acl = root.follow_acl(380 msg.path, create=False, nulls_ok=client.nulls_ok, acl=client.acl, acl_key="e"381 )382 else:383 entry, _ = root.follow_acl(msg.path, create=False, nulls_ok=client.nulls_ok)384 acl = NullACL385 kw = {}386 nchain = msg.get("nchain", 0)387 ps = PathShortener(entry.path)388 max_depth = msg.get("max_depth", None)389 empty = msg.get("empty", False)390 conv = client.conv391 if max_depth is not None:392 kw["max_depth"] = max_depth393 min_depth = msg.get("min_depth", None)394 if min_depth is not None:395 kw["min_depth"] = min_depth396 kw["full"] = empty397 async def send_sub(entry, acl):398 if is NotGiven and not empty:399 return400 res = entry.serialize(chop_path=client._chop_path, nchain=nchain, conv=conv)401 if not acl.allows("r"):402 res.pop("value", None)403 ps(res)404 await self.send(**res)405 if not acl.allows("e"):406 raise StopAsyncIteration407 if not acl.allows("x"):408 acl.block("r")409 await entry.walk(send_sub, acl=acl, **kw)410class SCmd_get_tree_internal(SCmd_get_tree):411 """Get a subtree (internal data)."""412 async def run(self): # pylint: disable=arguments-differ413 return await super().run(root=self.client.metaroot)414class SCmd_watch(StreamCommand):415 """416 Monitor a subtree for changes.417 If ``state`` is set, dump the initial state before reporting them.418 path: position to start to monitor.419 nchain: number of change chain entries to return. Default 0=don't send chain data.420 state: flag whether to send the current subtree before reporting changes. Default False.421 The returned data is PathShortened.422 The current state dump may not be consistent; always process changes.423 """424 multiline = True425 async def run(self):426 msg = self.msg427 client = self.client428 conv = client.conv429 entry, acl = client.root.follow_acl(430 msg.path, acl=client.acl, acl_key="x", create=True, nulls_ok=client.nulls_ok431 )432 nchain = msg.get("nchain", 0)433 max_depth = msg.get("max_depth", -1)434 min_depth = msg.get("min_depth", 0)435 empty = msg.get("empty", False)436 async with Watcher(entry) as watcher:437 async with anyio.create_task_group() as tg:438 tock = client.server.tock439 shorter = PathShortener(entry.path)440 if msg.get("fetch", False):441 async def orig_state():442 kv = {"max_depth": max_depth, "min_depth": min_depth}443 async def worker(entry, acl):444 if is NotGiven and not empty:445 return446 if entry.tock < tock:447 res = entry.serialize(448 chop_path=client._chop_path, nchain=nchain, conv=conv449 )450 shorter(res)451 if not acl.allows("r"):452 res.pop("value", None)453 await self.send(**res)454 if not acl.allows("e"):455 raise StopAsyncIteration456 if not acl.allows("x"):457 acl.block("r")458 await entry.walk(worker, acl=acl, **kv)459 await self.send(state="uptodate")460 tg.start_soon(orig_state)461 async for m in watcher:462 ml = len(m.entry.path) - len(msg.path)463 if ml < min_depth:464 continue465 if max_depth >= 0 and ml > max_depth:466 continue467 a = acl468 for p in getattr(m, "path", [])[shorter.depth :]:469 if not a.allows("e"):470 break471 if not acl.allows("x"):472 a.block("r")473 a = a.step(p)474 else:475 res = m.entry.serialize(476 chop_path=client._chop_path, nchain=nchain, conv=conv477 )478 shorter(res)479 if not a.allows("r"):480 res.pop("value", None)481 await self.send(**res)482class SCmd_msg_monitor(StreamCommand):483 """484 Monitor a topic for changes.485 This is a pass-through command.486 """487 multiline = True488 async def run(self):489 msg = self.msg490 raw = msg.get("raw", False)491 topic = msg.topic492 if isinstance(topic, str):493 topic = P(topic)494 if len(topic) and topic[0][0] == ":":495 topic = P(self.client.server.cfg.root) + topic496 async with self.client.server.backend.monitor(*topic) as stream:497 async for resp in stream:498 if hasattr(resp, "topic"):499 t = resp.topic500 if isinstance(t, str):501 t = t.split(".")502 else:503 t = topic504 res = {"topic": t}505 if raw:506 res["raw"] = resp.payload507 else:508 try:509 res["data"] = unpacker(resp.payload)510 except Exception as exc:511 res["raw"] = resp.payload512 res["error"] = repr(exc)513 await self.send(**res)514class ServerClient:515 """Represent one (non-server) client."""516 is_chroot = False517 _user = None # user during auth518 user = None # authorized user519 _dh_key = None520 conv = ConvNull521 acl: ACLStepper = NullACL522 tg = None523 qlen = 0524 def __init__(self, server: "Server", stream: Stream):525 self.server = server526 self.root = server.root527 self.metaroot = self.root.follow(Path(None), create=True, nulls_ok=True)528 = stream529 self.tasks = {}530 self.in_stream = {0: HelloProc(self)}531 self._chop_path = 0532 self._send_lock = anyio.Lock()533 global _client_nr534 _client_nr += 1535 self._client_nr = _client_nr536 self.logger = server.logger537 @property538 def nulls_ok(self):539 if self.is_chroot:540 return False541 if None not in self.root:542 return 2543 if self.user.is_super_root:544 return True545 # TODO test for superuser-ness, if so return True546 return False547 async def _process(self, fn, msg):548 res = await fn(msg)549 if res is None:550 res = {}551 elif not isinstance(res, dict):552 res = {"result": res}553 res["seq"] = msg.seq554 await self.send(res)555 async def process(self, msg, evt=None):556 """557 Process an incoming message.558 """559 needAuth = self.user is None or self._user is not None560 self.logger.debug("IN_%d %s", self._client_nr, msg)561 seq = msg.seq562 with anyio.CancelScope() as s:563 self.tasks[seq] = s564 if "chain" in msg:565 msg.chain = NodeEvent.deserialize(msg.chain, cache=self.server.node_cache)566 fn = None567 if msg.get("state", "") != "start":568 fn = getattr(self, "cmd_" + str(msg.action), None)569 if fn is None:570 fn = StreamCommand(self, msg)571 if needAuth and not getattr(fn, "noAuth", False):572 raise NoAuthError()573 else:574 if needAuth and not getattr(fn, "noAuth", False):575 raise NoAuthError()576 fn = partial(self._process, fn, msg)577 if evt is not None:578 evt.set()579 try:580 await fn()581 except (anyio.BrokenResourceError,BrokenPipeError) as exc:582"ERR%d: %s", self._client_nr, repr(exc))583 except Exception as exc:584 if not isinstance(exc, ClientError):585 self.logger.exception("ERR%d: %s", self._client_nr, repr(msg))586 await self.send({"error": repr(exc), "seq": seq})587 finally:588 del self.tasks[seq]589 def _chroot(self, root):590 if not root:591 return592 entry, _acl = self.root.follow_acl(root, acl=self.acl, nulls_ok=False)593 self.root = entry594 self.is_chroot = True595 self._chop_path += len(root)596 async def cmd_diffie_hellman(self, msg):597 if self._dh_key:598 raise RuntimeError("Can't call dh twice")599 from moat.lib.diffiehellman import DiffieHellman600 def gen_key():601 length = msg.get("length", 1024)602 k = DiffieHellman(key_length=length, group=(5 if length < 32 else 14))603 k.generate_public_key()604 k.generate_shared_secret(byte2num(msg.pubkey))605 self._dh_key = num2byte(k.shared_secret)[0:32]606 return k607 async with self.server.crypto_limiter:608 k = await anyio.to_thread.run_sync(gen_key)609 return {"pubkey": num2byte(k.public_key)}610 cmd_diffie_hellman.noAuth = True611 @property612 def dh_key(self):613 if self._dh_key is None:614 raise RuntimeError("The client has not executed DH key exchange")615 return self._dh_key616 async def cmd_fake_info(self, msg):617 msg["node"] = ""618 msg["tick"] = 0619 self.logger.warning("Fake Info LOCAL %s", pformat(msg))620 await self.server.user_info(msg)621 async def cmd_fake_info_send(self, msg):622 msg["node"] = ""623 msg["tick"] = 0624 msg.pop("tock", None)625 self.logger.warning("Fake Info SEND %s", pformat(msg))626 await self.server._send_event("info", msg)627 async def cmd_auth_get(self, msg):628 class AuthGet(SingleMixin, SCmd_auth_get):629 pass630 return await AuthGet(self, msg)()631 async def cmd_auth_set(self, msg):632 class AuthSet(SingleMixin, SCmd_auth_set):633 pass634 return await AuthSet(self, msg)()635 async def cmd_auth_list(self, msg):636 class AuthList(SingleMixin, SCmd_auth_list):637 pass638 return await AuthList(self, msg)()639 async def cmd_auth_info(self, msg):640 msg["path"] = Path(None, "auth")641 return await self.cmd_get_internal(msg)642 async def cmd_root(self, msg):643 """Change to a sub-tree."""644 self._chroot(msg.path)645 return self.root.serialize(chop_path=self._chop_path, conv=self.conv)646 async def cmd_get_internal(self, msg):647 return await self.cmd_get_value(msg, root=self.metaroot, _nulls_ok=True)648 async def cmd_set_internal(self, msg):649 return await self.cmd_set_value(msg, root=self.metaroot, _nulls_ok=True)650 async def cmd_enum_internal(self, msg):651 return await self.cmd_enum(msg, root=self.metaroot, _nulls_ok=True)652 async def cmd_delete_internal(self, msg):653 return await self.cmd_delete_value(msg, root=self.metaroot)654 async def cmd_get_tock(self, msg): # pylint: disable=unused-argument655 return {"tock": self.server.tock}656 async def cmd_test_acl(self, msg):657 """Check which ACL a path matches."""658 root = self.root659 mode = msg.get("mode") or "x"660 acl = self.acl661 acl2 = msg.get("acl", None)662 try:663 _entry, _acl = root.follow_acl(664 msg.path,665 acl=self.acl,666 acl_key="a" if acl2 is None else mode,667 nulls_ok=False,668 create=None,669 )670 if acl2 is not None:671 ok = acl.allows("a") # pylint: disable=no-value-for-parameter # pylint is confused672 acl2 = root.follow(Path(None, "acl", acl2), create=False, nulls_ok=True)673 acl2 = ACLFinder(acl2)674 _entry, acl = root.follow_acl(675 msg.path, acl=acl2, acl_key=mode, nulls_ok=False, create=None676 )677 if not ok:678 acl.block("a")679 acl.check(mode)680 except ACLError:681 return {"access": False}682 else:683 return {"access": if acl.allows("a") else True}684 async def cmd_enum(self, msg, with_data=None, _nulls_ok=None, root=None):685 """Get all sub-nodes."""686 if root is None:687 root = self.root688 if with_data is None:689 with_data = msg.get("with_data", False)690 entry, acl = root.follow_acl(691 msg.path, acl=self.acl, acl_key="e", create=False, nulls_ok=_nulls_ok692 )693 empty = msg.get("empty", False)694 if with_data:695 res = {}696 for k, v in entry.items():697 a = acl.step(k)698 if a.allows("r"):699 if is not NotGiven and acl.allows("x"):700 res[k] = self.conv.enc_value(, entry=v)701 elif empty:702 res[k] = None703 else:704 res = []705 for k, v in entry.items():706 if empty or is not NotGiven:707 a = acl.step(k)708 if a.allows("e"):709 res.append(k)710 return {"result": res}711 cmd_enumerate = cmd_enum # backwards compat: XXX remove712 async def cmd_enum_node(self, msg):713 n = msg.get("max", 0)714 cur = msg.get("current", False)715 node = Node(msg["node"], None, cache=self.server.node_cache, create=False)716 res = list(node.enumerate(n=n, current=cur))717 return {"result": res}718 async def cmd_kill_node(self, msg):719 node = msg["node"]720 node = Node(msg["node"], None, cache=self.server.node_cache, create=False)721 for k in node.enumerate(current=True):722 raise ServerError(f"Node {} has entry {k}")723 await self.server.drop_node( async def cmd_get_value(self, msg, _nulls_ok=None, root=None):725 """Get a node's value."""726 if "node" in msg and "path" not in msg:727 n = Node(msg.node, cache=self.server.node_cache, create=False)728 return n[msg.tick].serialize(729 chop_path=self._chop_path, nchain=msg.get("nchain", 0), conv=self.conv730 )731 if _nulls_ok is None:732 _nulls_ok = self.nulls_ok733 if root is None:734 root = self.root735 try:736 entry, _ = root.follow_acl(737 msg.path, create=False, acl=self.acl, acl_key="r", nulls_ok=_nulls_ok738 )739 except KeyError:740 entry = {}741 if msg.get("nchain", 0):742 entry["chain"] = None743 else:744 entry = entry.serialize(chop_path=-1, nchain=msg.get("nchain", 0), conv=self.conv)745 return entry746 async def cmd_set_value(self, msg, **kw):747 """Set a node's value."""748 if "value" not in msg:749 raise ClientError("Call 'delete_value' if you want to clear the value")750 return await self._set_value(msg, value=msg.value, **kw)751 async def cmd_delete_value(self, msg, **kw):752 """Delete a node's value."""753 if "value" in msg:754 raise ClientError("A deleted entry can't have a value")755 return await self._set_value(msg, **kw)756 async def _set_value(self, msg, value=NotGiven, root=None, _nulls_ok=False):757 # TODO drop this as soon as we have server-side user mods758 if self.user.is_super_root and root is None:759 _nulls_ok = 2760 if root is None:761 root = self.root762 acl = self.acl763 else:764 acl = NullACL765 entry, acl = root.follow_acl(msg.path, acl=acl, acl_key="W", nulls_ok=_nulls_ok)766 if root is self.root and "match" in self.metaroot:767 try:768 self.metaroot["match"].check_value(None if value is NotGiven else value, entry)769 except ClientError:770 raise771 except Exception as exc:772 self.logger.exception("Err %s: %r", exc, msg)773 raise ClientError(repr(exc)) from None774 # TODO pass exceptions to the client775 send_prev = True776 nchain = msg.get("nchain", 1)777 if msg.get("idem", False) and type( is type(value) and == value:778 res = attrdict(tock=entry.tock, changed=False)779 if nchain > 0:780 res.chain = entry.chain.serialize(nchain=nchain)781 return res782 if "prev" in msg:783 if != msg.prev:784 raise ClientError(f"Data is { !r} not {msg.prev !r} at {msg.path}")785 send_prev = False786 if "chain" in msg:787 if msg.chain is None:788 if is not NotGiven:789 raise ClientChainError(f"Entry already exists at {msg.path}")790 elif is NotGiven:791 raise ClientChainError(f"Entry is new at {msg.path}")792 elif entry.chain != msg.chain:793 raise ClientChainError(794 f"Chain is {entry.chain !r} not {msg.chain !r} for {msg.path}"795 )796 send_prev = False797 res = attrdict()798 if value is NotGiven:799 res.changed = is not NotGiven800 else:801 res.changed = != value802 if send_prev and is not NotGiven:803 res.prev = self.conv.enc_value(, entry=entry)804 nchain = msg.get("nchain", 1)805 value = msg.get("value", NotGiven)806 async with self.server.next_event() as event:807 await entry.set_data(808 event,809 NotGiven if value is NotGiven else self.conv.dec_value(value, entry=entry),810 server=self.server,811 tock=self.server.tock,812 )813 if nchain != 0:814 res.chain = entry.chain.serialize(nchain=nchain)815 res.tock = entry.tock816 return res817 async def cmd_update(self, msg):818 """819 Apply a stored update.820 You usually do this via a stream command.821 """822 msg = UpdateEvent.deserialize(823 self.root, msg, nulls_ok=self.nulls_ok, conv=self.conv, cache=self.server._nodes824 )825 res = await msg.entry.apply(msg, server=self, root=self.root)826 if res is None:827 return False828 else:829 return res.serialize(chop_path=self._chop_path, conv=self.conv)830 async def cmd_check_deleted(self, msg):831 nodes = msg.nodes832 deleted = NodeSet()833 for n, v in nodes.items():834 n = Node(n, None, cache=self.server.node_cache)835 r = RangeSet()836 r.__setstate__(v)837 for a, b in r:838 for t in range(a, b):839 if t not in n:840 deleted.add(, t)841 if deleted:842 await self.server._send_event("info", attrdict(deleted=deleted.serialize()))843 async def cmd_get_state(self, msg):844 """Return some info about this node's internal state"""845 return await self.server.get_state(**msg)846 async def cmd_msg_send(self, msg):847 topic = msg.topic848 if isinstance(topic, str):849 topic = (topic,)850 if topic[0][0] == ":":851 topic = P(self.server.cfg.root) + topic852 if "raw" in msg:853 assert "data" not in msg854 data = msg.raw855 else:856 data = packer( await self.server.backend.send(*topic, payload=data)858 async def cmd_delete_tree(self, msg):859 """Delete a node's value.860 Sub-nodes are cleared (after their parent).861 """862 seq = msg.seq863 if not msg.path:864 raise ClientError("You can't delete the root node")865 nchain = msg.get("nchain", 0)866 if nchain:867 await self.send({"seq": seq, "state": "start"})868 ps = PathShortener(msg.path)869 try:870 entry, acl = self.root.follow_acl(871 msg.path, acl=self.acl, acl_key="d", nulls_ok=self.nulls_ok872 )873 except KeyError:874 return False875 async def _del(entry, acl):876 res = 0877 if is not None and acl.allows("d"):878 async with self.server.next_event() as event:879 evt = await entry.set_data(event, NotGiven, server=self, tock=self.server.tock)880 if nchain:881 r = evt.serialize(882 chop_path=self._chop_path, nchain=nchain, with_old=True, conv=self.conv883 )884 r["seq"] = seq885 r.pop("new_value", None) # always None886 ps(r)887 await self.send(r)888 res += 1889 if not acl.allows("e") or not acl.allows("x"):890 return891 for v in entry.values():892 a = acl.step(v, new=True)893 res += await _del(v, a)894 return res895 res = await _del(entry, acl)896 if nchain:897 await self.send({"seq": seq, "state": "end"})898 else:899 return {"changed": res}900 async def cmd_log(self, msg):901 await self.server.run_saver(path=msg.path, save_state=msg.get("fetch", False))902 return True903 async def cmd_save(self, msg):904 full = msg.get("full", False)905 await, full=full)906 return True907 async def cmd_stop(self, msg):908 try:909 t = self.tasks[msg.task]910 except KeyError:911 return False912 t.cancel()913 return True914 async def cmd_set_auth_typ(self, msg):915 if not self.user.is_super_root:916 raise RuntimeError("You're not allowed to do that")917 a = self.root.follow(Path(None, "auth"), nulls_ok=True)918 if is NotGiven:919 val = {}920 else:921 val = if msg.typ is None:923 val.pop("current", None)924 elif msg.typ not in a or not len(a[msg.typ]["user"].keys()):925 raise RuntimeError("You didn't configure this method yet:" + repr((msg.typ, vars(a))))926 else:927 val["current"] = msg.typ928 msg.value = val929 msg.path = (None, "auth")930 return await self.cmd_set_value(msg, _nulls_ok=True)931 async def send(self, msg):932 self.logger.debug("OUT%d %s", self._client_nr, msg)933 if self._send_lock is None:934 return935 async with self._send_lock:936 if self._send_lock is None:937 # yes this can happen, when the connection is torn down938 return939 if "tock" not in msg:940 msg["tock"] = self.server.tock941 try:942 await except ClosedResourceError:944"ERO%d %r", self._client_nr, msg)945 self._send_lock = None946 raise947 async def send_result(self, seq, res):948 res["seq"] = seq949 if "tock" in res:950 await self.server.tock_seen(res["tock"])951 else:952 res["tock"] = self.server.tock953 await self.send(res)954 async def run(self):955 """Main loop for this client connection."""956 unpacker_ = stream_unpacker() # pylint: disable=redefined-outer-name957 async with anyio.create_task_group() as tg:958 = tg959 msg = {960 "seq": 0,961 "version": _version_tuple,962 "node":,963 "tick": self.server.node.tick,964 "tock": self.server.tock,965 "qlen": self.server.cfg.server.buffer,966 }967 try:968 auth = self.root.follow(Path(None, "auth"), nulls_ok=True, create=False)969 except KeyError:970 a = None971 else:972 auths = list(auth.keys())973 try:974 a =["current"]975 except (ValueError, KeyError, IndexError, TypeError):976 a = None977 else:978 try:979 auths.remove(a)980 except ValueError:981 a = None982 auths.insert(0, a)983 msg["auth"] = auths984 if a is None:985 from .auth import RootServerUser986 self.user = RootServerUser()987 await self.send(msg)988 while True:989 for msg in unpacker_:990 seq = None991 try:992 seq = msg.seq993 send_q = self.in_stream.get(seq, None)994 if send_q is not None:995 await send_q.received(msg)996 else:997 evt = anyio.Event()998, msg, evt)999 await evt.wait()1000 except Exception as exc:1001 msg = {"error": str(exc)}1002 if isinstance(exc, ClientError): # pylint doesn't seem to see this, so …:1003 msg["etype"] = exc.etype # pylint: disable=no-member ### YES IT HAS1004 else:1005 self.logger.exception(1006 "ERR %d: Client error on %s", self._client_nr, repr(msg)1007 )1008 if seq is not None:1009 msg["seq"] = seq1010 await self.send(msg)1011 try:1012 buf = await except (anyio.BrokenResourceError,ConnectionResetError):1014"DEAD %d", self._client_nr)1015 break1016 if len(buf) == 0: # Connection was closed.1017 self.logger.debug("CLOSED %d", self._client_nr)1018 break1019 unpacker_.feed(buf)1020 tg.cancel_scope.cancel()1021 def drop_old_event(self, evt, old_evt=NotGiven):1022 return self.server.drop_old_event(evt, old_evt)1023 def mark_deleted(self, node, tick):1024 return self.server.mark_deleted(node, tick)1025class _RecoverControl:1026 _id = 01027 def __init__(self, server, scope, prio, local_history, sources):1028 self.server = server1029 self.scope = scope1030 self.prio = prio1031 local_history = set(local_history)1032 sources = set(sources)1033 self.local_history = local_history - sources1034 self.sources = sources - local_history1035 self.tock = server.tock1036 type(self)._id += 11037 self._id = type(self)._id1038 self._waiters = {}1039 async def _start(self):1040 chk = set()1041 rt = self.server._recover_tasks1042 for node in self.local_history:1043 xrc = rt.get(node, None)1044 if xrc is not None:1045 chk.add(xrc)1046 self.server._recover_tasks[node] = self1047 for t in chk:1048 await t._check()1049 async def _check(self):1050 lh = []1051 rt = self.server._recover_tasks1052 for n in self.local_history:1053 if rt.get(n, None) is self:1054 lh.append(n)1055 self.local_history = lh1056 if not lh:1057 self.cancel()1058 def __hash__(self):1059 return id(self)1060 def cancel(self):1061 self.scope.cancel()1062 rt = self.server._recover_tasks1063 for node in self.local_history:1064 if rt.get(node, None) is self:1065 del rt[node]1066 self.local_history = ()1067 for evt in list(self._waiters.values()):1068 evt.set()1069 def set(self, n):1070 evt = self._waiters.get(n, None)1071 if evt is None:1072 evt = anyio.Event()1073 self._waiters[n] = evt1074 evt.set()1075 async def wait(self, n):1076 evt = self._waiters.get(n, None)1077 if evt is None:1078 evt = anyio.Event()1079 self._waiters[n] = evt1080 await evt.wait()1081class Server:1082 """1083 This is the DistKV server. It manages connections to the Serf/MQTT server,1084 the DistKV clients, and (optionally) logs all changes to a file.1085 Args:1086 name (str): the name of this DistKV server instance.1087 It **must** be unique.1088 cfg: configuration.1089 See :attr:`distkv.default.CFG` for default values.1090 Relevant is the ``server`` sub-dict (mostly).1091 init (Any):1092 The initial content of the root entry. **Do not use this**, except1093 when setting up an entirely new DistKV network.1094 """1095 # pylint: disable=no-member # mis-categorizing cfg as tuple1096 backend = None1097 _ready = None1098 _ready2 = None1099 _actor = None1100 _del_actor = None1101 cfg: attrdict = None1102 force_startup: bool = False1103 seen_missing = None1104 fetch_running = None1105 sending_missing = None1106 ports = None1107 _tock = 01108 def __init__(self, name: str, cfg: dict = None, init: Any = NotGiven):1109 self.root = RootEntry(self, tock=self.tock)1110 self.cfg = combine_dict(cfg or {}, CFG, cls=attrdict)1111 if isinstance(self.cfg.server.root, str):1112 self.cfg.server.root = P(self.cfg.server.root)1113 else:1114 self.cfg.server.root = self.paranoid_root = self.root if self.cfg.server.paranoia else None1116 self._nodes: Dict[str, Node] = {}1117 self.node_drop = set()1118 self.node = Node(name, None, cache=self.node_cache)1119 self._init = init1120 self.crypto_limiter = anyio.Semaphore(3)1121 self.logger = logging.getLogger("distkv.server." + name)1122 self._delete_also_nodes = NodeSet()1123 # Lock for generating a new node event1124 self._evt_lock = anyio.Lock()1125 # connected clients1126 self._clients = set()1127 # cache for partial messages1128 self._part_len = SERF_MAXLEN - SERF_LEN_DELTA - len( self._part_seq = 01130 self._part_cache = dict()1131 self._savers = []1132 # This is here, not in _run_del, because _del_actor needs to be accessible early1133 self._del_actor = DeleteActor(self)1134 @property1135 def node_cache(self):1136 """1137 A node cache helper which also removes new nodes from the node_drop set.1138 """1139 class Cache:1140 def __len__(slf): # pylint: disable=no-self-argument1141 return len(self._nodes)1142 def __bool__(slf): # pylint: disable=no-self-argument1143 return len(self._nodes) > 01144 def __contains__(slf, k): # pylint: disable=no-self-argument1145 return k in self._nodes1146 def __getitem__(slf, k): # pylint: disable=no-self-argument1147 return self._nodes[k]1148 def __setitem__(slf, k, v): # pylint: disable=no-self-argument1149 self._nodes[k] = v1150 self.node_drop.discard(k)1151 def __delitem__(slf, k): # pylint: disable=no-self-argument1152 del self._nodes[k]1153 self.node_drop.add(k)1154 def get(slf, *k): # pylint: disable=no-self-argument1155 return self._nodes.get(*k)1156 def pop(slf, *k): # pylint: disable=no-self-argument1157 self.node_drop.add(k)1158 return self._nodes.pop(*k)1159 return Cache()1160 @asynccontextmanager1161 async def next_event(self):1162 """A context manager which returns the next event under a lock.1163 This increments ``tock`` because that increases the chance that the1164 node (or split) where something actually happens wins a collision.1165 Rationale: if the event is created and leaks to the environment, it1166 needs to be marked as deleted if incomplete. Otherwise the system1167 sees it as "lost" data.1168 """1169 async with self._evt_lock:1170 n = None1171 try:1172 self.node.tick += 11173 nt = self.node.tick1174 self._tock += 11175 await self._set_tock() # updates actor1176 n = NodeEvent(self.node)1177 yield n1178 except BaseException as exc:1179 if n is not None:1180 self.logger.warning("Deletion %s %d due to %r", self.node, n.tick, exc)1181 self.node.report_deleted(RangeSet((nt,)), self)1182 with anyio.move_on_after(2, shield=True):1183 await self._send_event(1184 "info", dict(node="", tick=0, deleted={ (nt,)})1185 )1186 raise1187 finally:1188 self._tock += 11189 # does not update actor again, once is sufficient1190 @property1191 def tock(self):1192 """Retrieve ``tock``.1193 Also increments it because tock values may not be re-used."""1194 self._tock += 11195 return self._tock1196 async def tock_seen(self, value):1197 """1198 Updates the current tock value so that it is at least ``value``.1199 Args:1200 value (int): some incoming '`tock``.1201 """1202 if value is None:1203 return1204 if self._tock < value:1205 self._tock = value1206 await self._set_tock()1207 async def _set_tock(self):1208 if self._actor is not None and self._ready.is_set():1209 await self._actor.set_value((self._tock, self.node.tick))1210 async def del_check(self, value):1211 """1212 Called when ``(None,"actor","del")`` is set.1213 """1214 if value is NotGiven:1215 await self._del_actor.disable()1216 return1217 nodes = value.get("nodes", ())1218 if in nodes:1219 await self._del_actor.enable(len(nodes))1220 else:1221 await self._del_actor.disable(len(nodes))1222 def drop_old_event(self, evt, old_evt=NotGiven):1223 """1224 Drop either one event, or any event that is in ``old_evt`` but not1225 in ``evt``.1226 """1227 if old_evt is None:1228 return1229 if old_evt is NotGiven:1230 evt.node.supersede(evt.tick)1231 return1232 nt = {}1233 while evt is not None:1234 assert not in nt1235 nt[] = evt.tick1236 evt = evt.prev1237 while old_evt is not None:1238 if nt.get(, 0) != old_evt.tick:1239 old_evt.node.supersede(old_evt.tick)1240 old_evt = old_evt.prev1241 async def _send_event(self, action: str, msg: dict):1242 """1243 Helper to send an event to the backend's ``action`` endpoint.1244 Args:1245 action (str): the endpoint to send to. Prefixed by ``cfg.root``.1246 msg: the message to send.1247 """1248 if "tock" not in msg:1249 msg["tock"] = self.tock1250 else:1251 await self.tock_seen(msg["tock"])1252 if "node" not in msg:1253 msg["node"] = self.node.name1254 if "tick" not in msg:1255 msg["tick"] = self.node.tick1256 self.logger.debug("Send %s: %r", action, msg)1257 for m in self._pack_multiple(msg):1258 await self.backend.send(*self.cfg.server.root, action, payload=m)1259 async def watcher(self):1260 """1261 The background task that watches a (sub)tree for changes.1262 """1263 async with Watcher(self.root, q_len=0, full=True) as watch:1264 async for msg in watch:1265 self.logger.debug("Watch: %r", msg)1266 if msg.event.node != self.node:1267 continue1268 if self.node.tick is None:1269 continue1270 p = msg.serialize(nchain=self.cfg.server.change.length)1271 await self._send_event("update", p)1272 async def resync_deleted(self, nodes):1273 """1274 Owch. We need to re-sync.1275 We collect the latest ticks in our object tree and send them to one1276 of the Delete nodes.1277 """1278 for n in nodes:1279 try:1280 host, port = await self._get_host_port(n)1281 cfg = combine_dict(1282 {"host": host, "port": port, "name":},1283 self.cfg.connect,1284 cls=attrdict,1285 )1286 auth = cfg.get("auth", None)1287 from .auth import gen_auth1288 cfg["auth"] = gen_auth(auth)1289 self.logger.debug("DelSync: connecting %s", cfg)1290 async with distkv_client.open_client(connect=cfg) as client:1291 # TODO auth this client1292 nodes = NodeSet()1293 n_nodes = 01294 async def send_nodes():1295 nonlocal nodes, n_nodes1296 await client._request( # pylint: disable=cell-var-from-loop1297 "check_deleted", iter=False, nchain=-1, nodes=nodes.serialize()1298 )1299 nodes.clear()1300 n_nodes = 01301 async def add(event):1302 nonlocal nodes, n_nodes1303 c = event.chain1304 if c is None:1305 return1306 nodes.add(, c.tick)1307 n_nodes += 11308 if n_nodes >= 100:1309 await send_nodes()1310 await self.root.walk(add)1311 if n_nodes > 0:1312 await send_nodes()1313 except (ServerConnectionError, ServerClosedError):1314 self.logger.exception("Unable to connect to %s", nodes)1315 else:1316 # The recipient will broadcast "info.deleted" messages for1317 # whatever it doesn't have, so we're done here.1318 return1319 def mark_deleted(self, node, tick):1320 """1321 This tick has been marked as deleted.1322 """1323 self._delete_also_nodes[].add(tick)1324 def purge_deleted(self, deleted):1325 """1326 These deleted entry is no longer required.1327 """1328 self.logger.debug("PurgeDel: %r", deleted)1329 for n, v in deleted.items():1330 n = Node(n, cache=self.node_cache)1331 n.purge_deleted(v)1332 async def get_state(1333 self,1334 nodes=False,1335 known=False,1336 superseded=False,1337 deleted=False,1338 missing=False,1339 present=False,1340 node_drop=False,1341 debug=False,1342 debugger=False,1343 remote_missing=False,1344 **_kw,1345 ):1346 """1347 Return some info about this node's internal state.1348 """1349 if known:1350 superseded = True1351 res = attrdict()1352 if nodes:1353 nd = res.nodes = {}1354 for n in self._nodes.values():1355 nd[] = n.tick1356 if superseded:1357 nd = res.known = {}1358 for n in self._nodes.values():1359 lk = n.local_superseded1360 if len(lk):1361 nd[] = lk.__getstate__()1362 if present:1363 nd = res.present = {}1364 for n in self._nodes.values():1365 lk = n.local_present1366 if len(lk):1367 nd[] = lk.__getstate__()1368 if deleted:1369 nd = res.deleted = {}1370 for n in self._nodes.values():1371 lk = n.local_deleted1372 if len(lk):1373 nd[] = lk.__getstate__()1374 if missing:1375 nd = res.missing = {}1376 for n in self._nodes.values():1377 if not n.tick:1378 continue1379 lk = n.local_missing1380 if len(lk):1381 nd[] = lk.__getstate__()1382 if remote_missing:1383 nd = res.remote_missing = {}1384 for n in self._nodes.values():1385 lk = n.remote_missing1386 if len(lk):1387 nd[] = lk.__getstate__()1388 if node_drop:1389 res.node_drop = list(self.node_drop)1390 if debug:1391 nd = res.debug = attrdict()1392 # TODO insert some debugging info1393 if debugger:1394 try:1395 import pdb_clone as pdb1396 except ImportError:1397 res["debugger"] = "Import error"1398 else:1399 pdb().set_trace_remote(host=b"", port=57935)1400 res["node"] = self.node.name1401 res["tock"] = self.tock1402 return res1403 async def user_update(self, msg):1404 """1405 Process an update message: deserialize it and apply the result.1406 """1407 msg = UpdateEvent.deserialize(self.root, msg, cache=self.node_cache, nulls_ok=True)1408 await msg.entry.apply(msg, server=self, root=self.paranoid_root)1409 async def user_info(self, msg):1410 """1411 Process info broadcasts.1412 """1413 if msg.node == return # ignore our own message1415 # Step 11416 ticks = msg.get("ticks", None)1417 if ticks is not None:1418 for n, t in ticks.items():1419 n = Node(n, cache=self.node_cache)1420 n.tick = max_n(n.tick, t)1421 # did this message pre-empt our own transmission?1422 rec = self._recover_tasks.get(msg.node, None)1423 if rec is not None:1424 rec.set(1)1425 self.logger.debug("Step1: %r triggered by %s", rec, msg.node)1426 # Step 21427 missing = msg.get("missing", None)1428 if missing is not None:1429 nn = 01430 for n, k in missing.items():1431 n = Node(n, cache=self.node_cache)1432 r = RangeSet()1433 r.__setstate__(k)1434 nn += len(r)1435 n.report_missing(r)1436 # add to the node's seen_missing1437 mr = self.seen_missing.get(n, None)1438 if mr is None:1439 self.seen_missing[n] = r1440 else:1441 mr += r1442 # did this message pre-empt our own transmission?1443 rec = self._recover_tasks.get(msg.node, None)1444 if rec is not None:1445 rec.set(2)1446 self.logger.debug("Step2: %r triggered by %s", rec, msg.node)1447 if nn > 0:1448 # Some data have been reported to be missing.1449 # Send them.1450 self.logger.debug("MISS %d %r", nn, self.seen_missing)1451 await self._run_send_missing(None)1452 # Step 31453 superseded = msg.get("superseded", None)1454 if superseded is None:1455 superseded = msg.get("known", None)1456 if superseded is not None:1457 for n, k in superseded.items():1458 n = Node(n, cache=self.node_cache)1459 r = RangeSet()1460 r.__setstate__(k)1461 r -= n.local_present1462 # might happen when loading stale data1463 n.report_superseded(r)1464 deleted = msg.get("deleted", None)1465 if deleted is not None:1466 for n, k in deleted.items():1467 n = Node(n, cache=self.node_cache)1468 r = RangeSet()1469 r.__setstate__(k)1470 n.report_deleted(r, self)1471 # Dropped nodes.1472 for nn in msg.get("node_drop", ()):1473 self._dropped_node(nn)1474 async def _delete_also(self):1475 """1476 Add deletion records to the delete actor.1477 """1478 while True:1479 await anyio.sleep(10)1480 if self._delete_also_nodes:1481 self._del_actor.add_deleted(self._delete_also_nodes)1482 self._delete_also_nodes = NodeSet()1483 def _pack_multiple(self, msg):1484 """"""1485 # protect against mistakenly encoded multi-part messages1486 # TODO use a msgpack extension instead1487 if isinstance(msg, Mapping):1488 i = 01489 while (f"_p{i}") in msg:1490 i += 11491 j = i1492 while i:1493 i -= 11494 msg[f"_p{i+1}"] = msg[f"_p{i}"]1495 if j:1496 msg["_p0"] = ""1497 p = packer(msg)1498 pl = self._part_len1499 if len(p) > SERF_MAXLEN:1500 # Owch. We need to split this thing.1501 self._part_seq = seq = self._part_seq + 11502 i = 01503 while i >= 0:1504 i += 11505 px, p = p[:pl], p[pl:]1506 if not p:1507 i = -i1508 px = {"_p0": (, seq, i, px)}1509 yield packer(px)1510 return1511 yield p1512 def _unpack_multiple(self, msg):1513 """1514 Undo the effects of _pack_multiple.1515 """1516 if isinstance(msg, Mapping) and "_p0" in msg:1517 p = msg["_p0"]1518 if p != "":1519 nn, seq, i, p = p1520 s = self._part_cache.get((nn, seq), None)1521 if s is None:1522 self._part_cache[(nn, seq)] = s = [None]1523 if i < 0:1524 i = -i1525 s[0] = b""1526 while len(s) <= i:1527 s.append(None)1528 s[i] = p1529 if None in s:1530 return None1531 p = b"".join(s)1532 del self._part_cache[(nn, seq)]1533 msg = unpacker(p)1534 msg["_p0"] = ""1535 i = 01536 while f"_p{i+1}" in msg:1537 msg[f"_p{i}"] = msg[f"_p{i+1}"]1538 i += 11539 del msg[f"_p{i}"]1540 return msg1541 async def monitor(self, action: str, delay: = None):1542 """1543 The task that hooks to the backend's event stream for receiving messages.1544 Args:1545 action: The action name1546 delay: an optional event to wait for, after starting the1547 listener but before actually processing messages. This helps to1548 avoid consistency problems on startup.1549 """1550 cmd = getattr(self, "user_" + action)1551 try:1552 async with self.backend.monitor(*self.cfg.server.root, action) as stream:1553 if delay is not None:1554 await delay.wait()1555 async for resp in stream:1556 msg = unpacker(resp.payload)1557 msg = self._unpack_multiple(msg)1558 if not msg: # None, empty, whatever1559 continue1560 self.logger.debug("Recv %s: %r", action, msg)1561 try:1562 with anyio.fail_after(15):1563 await self.tock_seen(msg.get("tock", 0))1564 await cmd(msg)1565 except TimeoutError:1566 self.logger.error("CmdTimeout! %s: %r", action, msg)1567 raise1568 except (CancelledError,anyio.get_cancelled_exc_class()):1569 # self.logger.warning("Cancelled %s", action)1570 raise1571 except BaseException as exc:1572 self.logger.exception("Died %s: %r", action, exc)1573 raise1574 else:1575"Stream ended %s", action)1576 async def _run_del(self, evt):1577 try:1578 await finally:1580 self._del_actor = None1581 async def _pinger(self, delay: """1583 This task1584 * sends PING messages1585 * handles incoming pings1586 * triggers split recovery1587 The initial ping is delayed randomly.1588 Args:1589 delay: an event to set after the initial ping message has been1590 sent.1591 """1592 T = get_transport("distkv")1593 async with Actor(1594 T(self.backend, *self.cfg.server.root, "ping"),1595,1596,1597 send_raw=True,1598 ) as actor:1599 self._actor = actor1600 await self._check_ticked()1601 delay.set()1602 async for msg in actor:1603 # self.logger.debug("IN %r",msg)1604 if isinstance(msg, RecoverEvent):1605 self.spawn(1606 self.recover_split,1607 msg.prio,1608 msg.replace,1609 msg.local_nodes,1610 msg.remote_nodes,1611 )1612 elif isinstance(msg, GoodNodeEvent):1613 self.spawn(self.fetch_data, msg.nodes)1614 elif isinstance(msg, RawMsgEvent):1615 msg = msg.msg1616 msg_node = msg.get("node", None)1617 if msg_node is None:1618 msg_node = msg.get("history", (None,))[0]1619 if msg_node is None:1620 continue1621 val = msg.get("value", None)1622 tock = None1623 if val is not None:1624 tock, val = val1625 await self.tock_seen(tock)1626 node = Node(msg_node, val, cache=self.node_cache)1627 if tock is not None:1628 node.tock = tock1629 elif isinstance(msg, TagEvent):1630 # We're "it"; find missing data1631 await self._send_missing()1632 elif isinstance(msg, (TagEvent, UntagEvent, DetagEvent)):1633 pass1634 # TODO tell clients, for cleanup tasks in handlers,1635 # e.g. error needs to consolidate messages1636 async def _get_host_port(self, host):1637 """Retrieve the remote system to connect to.1638 WARNING: While this is nice, there'a chicken-and-egg problem here.1639 While you can use the hostmap to temporarily add new hosts with1640 unusual addresses, the new host still needs a config entry.1641 """1642 # this is async because the test mock needs that1643 port = self.cfg.connect.port1644 domain = self.cfg.domain1645 try:1646 # First try to read the host name from the meta-root's1647 # "hostmap" entry, if any.1648 hme = self.root.follow(Path(None, "hostmap", host), create=False, nulls_ok=True)1649 if is NotGiven:1650 raise KeyError(host)1651 except KeyError:1652 hostmap = self.cfg.hostmap1653 if host in hostmap:1654 host = hostmap[host]1655 if not isinstance(host, str):1656 # must be a 2-element tuple1657 host, port = host1658 else:1659 # If it's a string, the port may have been passed as1660 # part of the hostname. (Notably on the command line.)1661 try:1662 host, port = host.rsplit(":", 1)1663 except ValueError:1664 pass1665 else:1666 port = int(port)1667 else:1668 # The hostmap entry in the database must be a tuple1669 host, port = hme.data1670 if domain is not None and "." not in host and host != "localhost":1671 host += "." + domain1672 return (host, port)1673 async def do_send_missing(self):1674 """Task to periodically send "missing …" messages"""1675 self.logger.debug("send-missing started")1676 clock = while self.fetch_missing:1678 if self.fetch_running is not False:1679 self.logger.debug("send-missing halted")1680 return1681 clock *= self._actor.random / 2 + 11682 await anyio.sleep(clock)1683 n = 01684 msg = dict()1685 for n in list(self.fetch_missing):1686 m = n.local_missing1687 nl = len(m)1688 if nl == 0:1689 self.fetch_missing.remove(n)1690 continue1691 mr = self.seen_missing.get(, None)1692 if mr is not None:1693 m -= mr1694 if len(m) == 0:1695 continue1696 msg[] = m.__getstate__()1697 self.seen_missing = {}1698 if not n: # nothing more to do1699 break1700 if not len(msg): # others already did the work, this time1701 continue1702 msg = attrdict(missing=msg)1703 self.logger.warning("Missing data: %r", msg)1704 await self._send_event("info", msg)1705 self.logger.debug("send-missing ended")1706 if self.node.tick is None:1707 self.node.tick = 01708 await self._check_ticked()1709 self.fetch_running = None1710 async def fetch_data(self, nodes, authoritative=False):1711 """1712 We are newly started and don't have any data.1713 Try to get the initial data from some other node.1714 """1715 if self.fetch_running is not None:1716 return1717 self.fetch_running = True1718 for n in nodes:1719 try:1720 host, port = await self._get_host_port(n)1721 cfg = combine_dict(1722 {"host": host, "port": port, "name":},1723 self.cfg.connect,1724 cls=attrdict,1725 )1726 auth = cfg.get("auth", None)1727 from .auth import gen_auth1728 cfg["auth"] = gen_auth(auth)1729"Sync: connecting: %s", cfg)1730 async with distkv_client.open_client(connect=cfg) as client:1731 # TODO auth this client1732 pl = PathLongener(())1733 res = await client._request(1734 "get_tree", iter=True,, nchain=-1, path=()1735 )1736 async for r in res:1737 pl(r)1738 r = UpdateEvent.deserialize(1739 self.root, r, cache=self.node_cache, nulls_ok=True1740 )1741 await r.entry.apply(r, server=self, root=self.paranoid_root)1742 await self.tock_seen(res.end_msg.tock)1743 pl = PathLongener((None,))1744 res = await client._request(1745 "get_tree_internal",1746 iter=True,1747,1748 nchain=-1,1749 path=(),1750 )1751 async for r in res:1752 pl(r)1753 r = UpdateEvent.deserialize(1754 self.root, r, cache=self.node_cache, nulls_ok=True1755 )1756 await r.entry.apply(r, server=self, root=self.paranoid_root)1757 await self.tock_seen(res.end_msg.tock)1758 res = await client._request(1759 "get_state",1760 nodes=True,1761,1762 known=True,1763 deleted=True,1764 iter=False,1765 )1766 await self._process_info(res)1767 except (AttributeError, KeyError, ValueError, AssertionError, TypeError):1768 raise1769 except Exception:1770 self.logger.exception("Unable to connect to %s:%d", host, port)1771 else:1772 # At this point we successfully cloned some other1773 # node's state, so we now need to find whatever that1774 # node didn't have.1775 if authoritative:1776 # … or not.1777 self._discard_all_missing()1778 for nst in self._nodes.values():1779 if nst.tick and len(nst.local_missing):1780 self.fetch_missing.add(nst)1781 if len(self.fetch_missing):1782 self.fetch_running = False1783 for nm in self.fetch_missing:1784 self.logger.error("Sync: missing: %s %s",, nm.local_missing)1785 self.spawn(self.do_send_missing)1786 if self.force_startup or not len(self.fetch_missing):1787 if self.node.tick is None:1788 self.node.tick = 01789 self.fetch_running = None1790 await self._check_ticked()1791 return1792 self.fetch_running = None1793 async def _process_info(self, msg):1794 """1795 Process "info" messages.1796 """1797 await self.tock_seen(msg.get("tock", 0))1798 # nodes: list of known nodes and their max ticks1799 for nn, t in msg.get("nodes", {}).items():1800 nn = Node(nn, cache=self.node_cache)1801 nn.tick = max_n(nn.tick, t)1802 # known: per-node range of ticks that have been resolved1803 for nn, k in msg.get("known", {}).items():1804 nn = Node(nn, cache=self.node_cache)1805 r = RangeSet()1806 r.__setstate__(k)1807 nn.report_superseded(r, local=True)1808 # deleted: per-node range of ticks that have been deleted1809 deleted = msg.get("deleted", {})1810 for nn, k in deleted.items():1811 nn = Node(nn, cache=self.node_cache)1812 r = RangeSet()1813 r.__setstate__(k)1814 nn.report_deleted(r, self)1815 # remote_missing: per-node range of ticks that should be re-sent1816 # This is used when loading data from a state file1817 for nn, k in msg.get("remote_missing", {}).items():1818 nn = Node(nn, cache=self.node_cache)1819 r = RangeSet()1820 r.__setstate__(k)1821 nn.report_missing(r)1822 # Dropped nodes.1823 for nn in msg.get("node_drop", ()):1824 self._dropped_node(nn)1825 async def drop_node(self, name):1826 self._dropped_node(name)1827 await self._send_event("info", attrdict(node_drop=[name]))1828 def _dropped_node(self, name):1829 try:1830 nn = Node(name, cache=self.node_cache, create=False)1831 except KeyError:1832 return1833 for _ in nn.enumerate(current=True):1834 break1835 else: # no item found1836 nn.kill_this_node(self.node_cache)1837 async def _check_ticked(self):1838 if self._ready is None:1839 return1840 if self.node.tick is not None:1841 self.logger.debug("Ready")1842 self._ready.set()1843 await self._set_tock()1844 else:1845 # self.logger.debug("Not yet ready.")1846 pass1847 async def recover_split(self, prio, replace, local_history, sources):1848 """1849 Recover from a network split.1850 """1851 with anyio.CancelScope() as scope:1852 for node in sources:1853 if node not in self._recover_tasks:1854 break1855 else:1856 return1857 t = _RecoverControl(self, scope, prio, local_history, sources)1858 self.logger.debug(1859 "SplitRecover %d: start %d %s local=%r remote=%r",1860 t._id,1861 prio,1862 replace,1863 local_history,1864 sources,1865 )1866 try:1867 await t._start()1868 clock = # Step 1: send an info/ticks message1870 # for prio=0 this fires immediately. That's intentional.1871 with anyio.move_on_after(clock * (1 - 1 / (1 << prio))) as x:1872 await t.wait(1)1873 if x.cancel_called:1874 msg = dict((, x.tick) for x in self._nodes.values())1875 msg = attrdict(ticks=msg)1876 if self.node_drop:1877 msg.node_drop = list(self.node_drop)1878 await self._send_event("info", msg)1879 # Step 2: send an info/missing message1880 # for prio=0 this fires after clock/2, so that we get a1881 # chance to wait for other info/ticks messages. We can't1882 # trigger on them because there may be more than one, for a1883 # n-way merge.1884 with anyio.move_on_after(clock * (2 - 1 / (1 << prio)) / 2) as x:1885 await t.wait(2)1886 if x.cancel_called:1887 await self._send_missing(force=True)1888 # wait a bit more before continuing. Again this depends on1889 # `prio` so that there won't be two nodes that send the same1890 # data at the same time, hopefully.1891 await anyio.sleep(clock * (1 - 1 / (1 << prio)))1892 # Step 3: start a task that sends stuff1893 await self._run_send_missing(prio)1894 finally:1895 with anyio.CancelScope(shield=True):1896 # Protect against cleaning up when another recovery task has1897 # been started (because we saw another merge)1898 self.logger.debug("SplitRecover %d: finished @%d", t._id, t.tock)1899 self.seen_missing = {}1900 t.cancel()1901 async def _send_missing(self, force=False):1902 msg = dict()1903 for n in list(self._nodes.values()):1904 if not n.tick:1905 continue1906 m = n.local_missing1907 mr = self.seen_missing.get(, None)1908 if mr is not None:1909 m -= mr1910 if len(m) == 0:1911 continue1912 msg[] = m.__getstate__()1913 if mr is None:1914 self.seen_missing[] = m1915 else:1916 mr += m1917 if force or msg:1918 msg = attrdict(missing=msg)1919 if self.node_drop:1920 msg.node_drop = list(self.node_drop)1921 await self._send_event("info", msg)1922 async def _run_send_missing(self, prio):1923 """Start :meth:`_send_missing_data` if it's not running"""1924 if self.sending_missing is None:1925 self.sending_missing = True1926 self.spawn(self._send_missing_data, prio)1927 elif not self.sending_missing:1928 self.sending_missing = True1929 async def _send_missing_data(self, prio):1930 """Step 3 of the re-join protocol.1931 For each node, collect events that somebody has reported as missing,1932 and re-broadcast them. If the event is unavailable, send a "known"1933 / "deleted" message.1934 """1935 self.logger.debug("SendMissing %s", prio)1936 clock = if prio is None:1938 await anyio.sleep(clock * (1 + self._actor.random / 3))1939 else:1940 await anyio.sleep(clock * (1 - (1 / (1 << prio)) / 2 - self._actor.random / 5))1941 self.logger.debug("SendMissingGo %s %s", prio, self.sending_missing)1942 while self.sending_missing:1943 self.sending_missing = False1944 nodes = list(self._nodes.values())1945 self._actor._rand.shuffle(nodes)1946 known = {}1947 deleted = {}1948 for n in nodes:1949 self.logger.debug(1950 "SendMissingGo %s %r %r",, n.remote_missing, n.local_superseded1951 )1952 k = n.remote_missing & n.local_superseded1953 for r in n.remote_missing & n.local_present:1954 for t in range(*r):1955 if t not in n.remote_missing:1956 # some other node could have sent this while we worked1957 await anyio.sleep( / 3)1958 continue1959 if t in n:1960 # could have been deleted while sleeping1961 msg = n[t].serialize()1962 await self._send_event("update", msg)1963 n.remote_missing.discard(t)1964 if k:1965 known[] = k.__getstate__()1966 d = n.remote_missing & n.local_deleted1967 if d:1968 deleted[] = d.__getstate__()1969 msg = attrdict()1970 if known:1971 msg.known = known1972 if deleted:1973 msg.deleted = deleted1974 if self.node_drop:1975 msg.node_drop = list(self.node_drop)1976 if msg:1977 await self._send_event("info", attrdict(known=known, deleted=deleted))1978 self.sending_missing = None1979 async def load(1980 self,1981 path: str = None,1982 stream: io.IOBase = None,1983 local: bool = False,1984 authoritative: bool = False,1985 ):1986 """Load data from this stream1987 Args:1988 ``fd``: The stream to read.1989 ``local``: Flag whether this file contains initial data and thus1990 its contents shall not be broadcast. Don't set this if1991 the server is already operational.1992 """1993 longer = PathLongener(())1994 if local and self.node.tick is not None:1995 raise RuntimeError("This server already has data.")1996 elif not local and self.node.tick is None:1997 raise RuntimeError("This server is not yet operational.")1998 async with MsgReader(path=path, stream=stream) as rdr:1999 async for m in rdr:2000 if "value" in m:2001 longer(m)2002 if "tock" in m:2003 await self.tock_seen(m.tock)2004 else:2005 m.tock = self.tock2006 m = UpdateEvent.deserialize(self.root, m, cache=self.node_cache, nulls_ok=True)2007 await self.tock_seen(m.tock)2008 await m.entry.apply(m, server=self, root=self.paranoid_root, loading=True)2009 elif "info" in m:2010 await self._process_info(m["info"])2011 elif "nodes" in m or "known" in m or "deleted" in m or "tock" in m: # XXX LEGACY2012 await self._process_info(m)2013 else:2014 self.logger.warning("Unknown message in stream: %s", repr(m))2015 if authoritative:2016 self._discard_all_missing()2017 self.logger.debug("Loading finished.")2018 def _discard_all_missing(self):2019 for n in self._nodes.values():2020 if not n.tick:2021 continue2022 lk = n.local_missing2023 if len(lk):2024 n.report_superseded(lk, local=True)2025 async def _save(self, writer, shorter, nchain=-1, full=False):2026 """Save the current state."""2027 async def saver(entry):2028 if is NotGiven:2029 return2030 res = entry.serialize(nchain=nchain)2031 shorter(res)2032 await writer(res)2033 msg = await self.get_state(nodes=True, known=True, deleted=True)2034 # await writer({"info": msg})2035 await writer(msg) # XXX legacy2036 await self.root.walk(saver, full=full)2037 async def save(self, path: str = None, stream=None, full=True):2038 """Save the current state to ``path`` or ``stream``."""2039 shorter = PathShortener([])2040 async with MsgWriter(path=path, stream=stream) as mw:2041 await self._save(mw, shorter, full=full)2042 async def save_stream(2043 self,2044 path: str = None,2045 stream: = None,2046 save_state: bool = False,2047 done: ValueEvent = None,2048 done_val=None,2049 ):2050 """Save the current state to ``path`` or ``stream``.2051 Continue writing updates until cancelled.2052 Args:2053 path: The file to save to.2054 stream: the stream to save to.2055 save_state: Flag whether to write the current state.2056 If ``False`` (the default), only write changes.2057 done: set when writing changes commences, signalling2058 that the old save file (if any) may safely be closed.2059 Exactly one of ``stream`` or ``path`` must be set.2060 This task flushes the current buffer to disk when one second2061 passes without updates, or every 100 messages.2062 """2063 shorter = PathShortener([])2064 async with MsgWriter(path=path, stream=stream) as mw:2065 msg = await self.get_state(nodes=True, known=True, deleted=True)2066 # await mw({"info": msg})2067 await mw(msg) # XXX legacy2068 last_saved = time.monotonic()2069 last_saved_count = 02070 async with Watcher(self.root, full=True) as updates:2071 await self._ready.wait()2072 if save_state:2073 await self._save(mw, shorter, full=True)2074 await mw.flush()2075 if done is not None:2076 s = done.set(done_val)2077 if s is not None:2078 await s2079 cnt = 02080 while True:2081 # This dance ensures that we save the system state often enough.2082 t = time.monotonic()2083 td = t - last_saved2084 if td >= 60 or last_saved_count > 1000:2085 msg = await self.get_state(nodes=True, known=True, deleted=True)2086 # await mw({"info": msg})2087 await mw(msg) # XXX legacy2088 await mw.flush()2089 last_saved = time.monotonic()2090 last_saved_count = 02091 td = -99999 # translates to something large, below2092 cnt = 02093 try:2094 with anyio.fail_after(1 if cnt else 60 - td):2095 msg = await updates.__anext__()2096 except TimeoutError:2097 await mw.flush()2098 cnt = 02099 else:2100 msg = msg.serialize()2101 shorter(msg)2102 last_saved_count += 12103 await mw(msg)2104 if cnt >= 100:2105 await mw.flush()2106 cnt = 02107 else:2108 cnt += 12109 async def _saver(2110 self, path: str = None, stream=None, done: ValueEvent = None, save_state=False2111 ):2112 with anyio.CancelScope() as s:2113 sd = anyio.Event()2114 state = (s, sd)2115 self._savers.append(state)2116 try:2117 await self.save_stream(2118 path=path, stream=stream, done=done, done_val=s, save_state=save_state2119 )2120 except EnvironmentError as err:2121 if done is None:2122 raise2123 done.set_error(err)2124 finally:2125 with anyio.CancelScope(shield=True):2126 sd.set()2127 async def run_saver(self, path: str = None, stream=None, save_state=False, wait: bool = True):2128 """2129 Start a task that continually saves to disk.2130 At most one one saver runs at a time; if a new one is started,2131 the old saver is cancelled as soon as the new saver's current state2132 is on disk (if told to do so) and it is ready to start writing.2133 Args:2134 path (str): The file to save to. If ``None``, simply stop any2135 already-running log.2136 stream ( the stream to save to.2137 save_state (bool): Flag whether to write the current state.2138 If ``False`` (the default), only write changes.2139 wait: wait for the save to really start.2140 """2141 done = ValueEvent() if wait else None2142 res = None2143 if path is not None:2144 self.spawn(2145 partial(self._saver, path=path, stream=stream, save_state=save_state, done=done)2146 )2147 if wait:2148 res = await done.get()2149 # At this point the new saver is operational, so we cancel the old one(s).2150 while self._savers is not None and self._savers[0][0] is not res:2151 s, sd = self._savers.pop(0)2152 s.cancel()2153 await sd.wait()2154 async def _sigterm(self):2155 with anyio.open_signal_receiver(signal.SIGTERM) as r:2156 async for s in r:2157 for s, sd in self._savers:2158 s.cancel()2159 await sd.wait()2160 break2161 os.kill(os.getpid(), signal.SIGTERM)2162 @property2163 async def is_ready(self):2164 """Await this to determine if/when the server is operational."""2165 await self._ready.wait()2166 @property2167 async def is_serving(self):2168 """Await this to determine if/when the server is serving clients."""2169 await self._ready2.wait()2170 async def serve(self, log_path=None, log_inc=False, force=False, ready_evt=None):2171 """Task that opens a backend connection and actually runs the server.2172 Args:2173 ``setup_done``: optional event that's set when the server is initially set up.2174 ``log_path``: path to a binary file to write changes and initial state to.2175 ``log_inc``: if saving, write changes, not the whole state.2176 ``force``: start up even if entries are missing2177 """2178 self.force_startup = force2179 back = get_backend(self.cfg.server.backend)2180 try:2181 conn = self.cfg.server[self.cfg.server.backend]2182 except KeyError:2183 conn = self.cfg.server.connect2184 async with back(**conn) as backend:2185 # pylint: disable=attribute-defined-outside-init2186 # Collect all "info/missing" messages seen since the last2187 # healed network split so that they're only sent once.2188 self.seen_missing = {}2189 # Is the missing-items-sender running?2190 # None=no, otherwise flag whether it should run another round2191 self.sending_missing = None2192 # Nodes which list missing events2193 self.fetch_missing = set()2194 # Flag whether do_fetch_missing is running (True)2195 # or do_send_missing is running (False)2196 # or neither (None)2197 self.fetch_running = None2198 # Set when self.node.tick is no longer None, i.e. we have some2199 # reasonable state2200 self._ready = anyio.Event()2201 # set when we're ready to accept client connections2202 self._ready2 = anyio.Event()2203 self.backend = backend2204 self.spawn = backend.spawn2205 # Sync recovery steps so that only one node per branch answers2206 self._recover_event1 = None2207 self._recover_event2 = None2208 # local and remote node lists2209 self._recover_sources = None2210 # Cancel scope; if :meth:`recover_split` is running, use that2211 # to cancel2212 self._recover_tasks = {}2213 # used to sync starting up everything so no messages get either2214 # lost, or processed prematurely2215 delay = anyio.Event()2216 delay2 = anyio.Event()2217 delay3 = anyio.Event()2218 self.spawn(self._run_del, delay3)2219 self.spawn(self._delete_also)2220 if log_path is not None:2221 await self.run_saver(path=log_path, save_state=not log_inc, wait=False)2222 # Link up our "user_*" code2223 for d in dir(self):2224 if d.startswith("user_"):2225 self.spawn(self.monitor, d[5:], delay)2226 await delay3.wait()2227 self.spawn(self.watcher)2228 if self._init is not NotGiven:2229 assert self.node.tick is None2230 self.node.tick = 02231 async with self.next_event() as event:2232 await self.root.set_data(event, self._init, tock=self.tock, server=self)2233 self.spawn(self._sigterm)2234 # send initial ping2235 self.spawn(self._pinger, delay2)2236 await anyio.sleep(0.1)2237 delay.set()2238 await self._check_ticked() # when _init is set2239 await delay2.wait()2240 await self._ready.wait()2241 cfgs = self.cfg.server.bind2242 cfg_b = self.cfg.server.bind_default2243 evts = []2244 async with anyio.create_task_group() as tg:2245 for n, cfg in enumerate(cfgs):2246 cfg = combine_dict(cfg, cfg_b, cls=attrdict)2247 evt = anyio.Event()2248 evts.append(evt)2249 tg.start_soon(self._accept_clients, tg, cfg, n, evt)2250 for evt in evts:2251 await evt.wait()2252 self._ready2.set()2253 if ready_evt is not None:2254 ready_evt.set()2255 pass # end of server taskgroup2256 pass # end of server2257 pass # end of backend client2258 async def _accept_clients(self, tg, cfg, n, evt):2259 ssl_ctx = gen_ssl(cfg["ssl"], server=True)2260 cfg = combine_dict({"ssl": ssl_ctx}, cfg, cls=attrdict)2261 def rdy(n, server):2262 if n == 0:2263 port = server.extra(SocketAttribute.local_address)2264 self.ports = [port]2265 evt.set()2266 await run_tcp_server(self._connect, tg=tg, _rdy=partial(rdy, n), **cfg)2267 async def _connect(self, stream):2268 c = None2269 try:2270 c = ServerClient(server=self, stream=stream)2271 self._clients.add(c)2272 await except (ClosedResourceError, anyio.EndOfStream):2274 self.logger.debug("XX %d closed", c._client_nr)2275 except BaseException as exc:2276 CancelExc = anyio.get_cancelled_exc_class()2277 if isinstance(exc, ExceptionGroup):2278 # pylint: disable=no-member2279 exc = exc.filter(lambda e: None if isinstance(e, CancelExc) else e, exc)2280 if exc is not None and not isinstance(exc, CancelExc):2281 if isinstance(exc, (ClosedResourceError, anyio.EndOfStream)):2282 self.logger.debug("XX %d closed", c._client_nr)2283 else:2284 self.logger.exception("Client connection killed", exc_info=exc)2285 if exc is None:2286 exc = "Cancelled"2287 try:2288 with anyio.move_on_after(2) as cs:2289 cs.shield = True2290 if c is not None:2291 await c.send({"error": str(exc)})2292 except Exception:2293 pass2294 finally:2295 with anyio.move_on_after(2, shield=True):2296 if c is not None:2297 self._clients.remove(c)...

Full Screen

Full Screen Github


Full Screen

...378 def _recover_processes(self):379 agent_tasks = self._create_recovery_agent_tasks()380 self._register_pidfiles(agent_tasks)381 _drone_manager.refresh()382 self._recover_tasks(agent_tasks)383 self._recover_pending_entries()384 self._check_for_unrecovered_verifying_entries()385 self._reverify_remaining_hosts()386 # reinitialize drones after killing orphaned processes, since they can387 # leave around files when they die388 _drone_manager.execute_actions()389 _drone_manager.reinitialize_drones()390 def _create_recovery_agent_tasks(self):391 return (self._get_queue_entry_agent_tasks()392 + self._get_special_task_agent_tasks(is_active=True))393 def _get_queue_entry_agent_tasks(self):394 """395 Get agent tasks for all hqe in the specified states.396 Loosely this translates to taking a hqe in one of the specified states,397 say parsing, and getting an AgentTask for it, like the FinalReparseTask,398 through _get_agent_task_for_queue_entry. Each queue entry can only have399 one agent task at a time, but there might be multiple queue entries in400 the group.401 @return: A list of AgentTasks.402 """403 # host queue entry statuses handled directly by AgentTasks (Verifying is404 # handled through SpecialTasks, so is not listed here)405 statuses = (models.HostQueueEntry.Status.STARTING,406 models.HostQueueEntry.Status.RUNNING,407 models.HostQueueEntry.Status.GATHERING,408 models.HostQueueEntry.Status.PARSING,409 models.HostQueueEntry.Status.ARCHIVING)410 status_list = ','.join("'%s'" % status for status in statuses)411 queue_entries = scheduler_models.HostQueueEntry.fetch(412 where='status IN (%s)' % status_list)413 autotest_stats.Gauge('scheduler.jobs_per_tick').send(414 'running', len(queue_entries))415 agent_tasks = []416 used_queue_entries = set()417 for entry in queue_entries:418 if self.get_agents_for_entry(entry):419 # already being handled420 continue421 if entry in used_queue_entries:422 # already picked up by a synchronous job423 continue424 agent_task = self._get_agent_task_for_queue_entry(entry)425 agent_tasks.append(agent_task)426 used_queue_entries.update(agent_task.queue_entries)427 return agent_tasks428 def _get_special_task_agent_tasks(self, is_active=False):429 special_tasks = models.SpecialTask.objects.filter(430 is_active=is_active, is_complete=False)431 return [self._get_agent_task_for_special_task(task)432 for task in special_tasks]433 def _get_agent_task_for_queue_entry(self, queue_entry):434 """435 Construct an AgentTask instance for the given active HostQueueEntry.436 @param queue_entry: a HostQueueEntry437 @return: an AgentTask to run the queue entry438 """439 task_entries = queue_entry.job.get_group_entries(queue_entry)440 self._check_for_duplicate_host_entries(task_entries)441 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,442 models.HostQueueEntry.Status.RUNNING):443 if queue_entry.is_hostless():444 return HostlessQueueTask(queue_entry=queue_entry)445 return QueueTask(queue_entries=task_entries)446 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:447 return postjob_task.GatherLogsTask(queue_entries=task_entries)448 if queue_entry.status == models.HostQueueEntry.Status.PARSING:449 return postjob_task.FinalReparseTask(queue_entries=task_entries)450 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:451 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)452 raise scheduler_lib.SchedulerError(453 '_get_agent_task_for_queue_entry got entry with '454 'invalid status %s: %s' % (queue_entry.status, queue_entry))455 def _check_for_duplicate_host_entries(self, task_entries):456 non_host_statuses = (models.HostQueueEntry.Status.PARSING,457 models.HostQueueEntry.Status.ARCHIVING)458 for task_entry in task_entries:459 using_host = ( is not None460 and task_entry.status not in non_host_statuses)461 if using_host:462 self._assert_host_has_no_agent(task_entry)463 def _assert_host_has_no_agent(self, entry):464 """465 @param entry: a HostQueueEntry or a SpecialTask466 """467 if self.host_has_agent( agent = tuple(self._host_agents.get([0]469 raise scheduler_lib.SchedulerError(470 'While scheduling %s, host %s already has a host agent %s'471 % (entry,, agent.task))472 def _get_agent_task_for_special_task(self, special_task):473 """474 Construct an AgentTask class to run the given SpecialTask and add it475 to this dispatcher.476 A special task is created through schedule_special_tasks, but only if477 the host doesn't already have an agent. This happens through478 add_agent_task. All special agent tasks are given a host on creation,479 and a Null hqe. To create a SpecialAgentTask object, you need a480 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask481 object contains a hqe it's passed on to the special agent task, which482 creates a HostQueueEntry and saves it as it's queue_entry.483 @param special_task: a models.SpecialTask instance484 @returns an AgentTask to run this SpecialTask485 """486 self._assert_host_has_no_agent(special_task)487 special_agent_task_classes = (prejob_task.CleanupTask,488 prejob_task.VerifyTask,489 prejob_task.RepairTask,490 prejob_task.ResetTask,491 prejob_task.ProvisionTask)492 for agent_task_class in special_agent_task_classes:493 if agent_task_class.TASK_TYPE == special_task.task:494 return agent_task_class(task=special_task)495 raise scheduler_lib.SchedulerError(496 'No AgentTask class for task', str(special_task))497 def _register_pidfiles(self, agent_tasks):498 for agent_task in agent_tasks:499 agent_task.register_necessary_pidfiles()500 def _recover_tasks(self, agent_tasks):501 orphans = _drone_manager.get_orphaned_autoserv_processes()502 for agent_task in agent_tasks:503 agent_task.recover()504 if agent_task.monitor and agent_task.monitor.has_process():505 orphans.discard(agent_task.monitor.get_process())506 self.add_agent_task(agent_task)507 self._check_for_remaining_orphan_processes(orphans)508 def _get_unassigned_entries(self, status):509 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"510 % status):511 if entry.status == status and not self.get_agents_for_entry(entry):512 # The status can change during iteration, e.g., if # sets a group of queue entries to Starting514 yield entry...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?