Best Python code snippet using localstack_python
remote.py
Source:remote.py  
...32    def remote_logging(self, req, set_ready):33        try:34            for resp in self._stub.Spawn(req):35                if set_ready and self.callback_on_first:36                    set_ready(resp)37                    self.callback_on_first = False38                self._remote_logger.info(resp.log_record)39        except grpc.RpcError:40            pass41    def close(self):42        if not self.is_closed:43            if self.ctrl_addr:44                send_ctrl_message(self.ctrl_addr, jina_pb2.Request.ControlRequest.TERMINATE,45                                  timeout=self.timeout_shutdown)46            super().close()47            self.is_closed = True48class PodSpawnHelper(PeaSpawnHelper):49    body_tag = 'pod'50    def __init__(self, args: 'argparse.Namespace'):51        super().__init__(args)52        self.all_ctrl_addr = []  #: all peas control address and ports of this pod, need to be set in set_ready()53    def close(self):54        if not self.is_closed:55            for ctrl_addr in self.all_ctrl_addr:56                send_ctrl_message(ctrl_addr, jina_pb2.Request.ControlRequest.TERMINATE,57                                  timeout=self.timeout_shutdown)58            GrpcClient.close(self)59            self.is_closed = True60class MutablePodSpawnHelper(PodSpawnHelper):61    def __init__(self, peas_args: Dict):62        inited = False63        for k in peas_args.values():64            if k:65                if not isinstance(k, list):66                    k = [k]67                if not inited:68                    # any pea will do, we just need its host and port_grpc69                    super().__init__(k[0])70                    inited = True71                for kk in k:72                    kk.log_remote = True73                    self.all_ctrl_addr.append(Zmqlet.get_ctrl_address(kk)[0])74        self.args = peas_args75    def call(self, set_ready: Callable = None):76        self.remote_logging(peas_args2mutable_pod_req(self.args), set_ready)77def peas_args2mutable_pod_req(peas_args: Dict):78    def pod2pea_args_list(args):79        return kwargs2list(vars(args))80    req = jina_pb2.SpawnRequest()81    if peas_args['head']:82        req.mutable_pod.head.args.extend(pod2pea_args_list(peas_args['head']))83    if peas_args['tail']:84        req.mutable_pod.tail.args.extend(pod2pea_args_list(peas_args['tail']))85    if peas_args['peas']:86        for q in peas_args['peas']:87            _a = req.mutable_pod.peas.add()88            _a.args.extend(pod2pea_args_list(q))89    return req90def mutable_pod_req2peas_args(req):91    from ..main.parser import set_pea_parser92    return {93        'head': set_pea_parser().parse_known_args(req.head.args)[0] if req.head.args else None,94        'tail': set_pea_parser().parse_known_args(req.tail.args)[0] if req.tail.args else None,95        'peas': [set_pea_parser().parse_known_args(q.args)[0] for q in req.peas] if req.peas else []96    }97class RemotePea(BasePea):98    """A RemotePea that spawns a remote :class:`BasePea`99    Useful in Jina CLI100    """101    remote_helper = PeaSpawnHelper102    def loop_body(self):103        self._remote = self.remote_helper(self.args)104        self._remote.start(self.set_ready)  # auto-close after105    def close(self):106        self._remote.close()107class RemotePod(RemotePea):108    """A RemotePod that spawns a remote :class:`BasePod`109    Useful in Jina CLI110    """111    remote_helper = PodSpawnHelper112    def set_ready(self, resp):113        _rep = getattr(resp, resp.WhichOneof('body'))114        peas_args = mutable_pod_req2peas_args(_rep)115        all_args = peas_args['peas'] + (116            [peas_args['head']] if peas_args['head'] else []) + (117                       [peas_args['tail']] if peas_args['tail'] else [])118        for s in all_args:119            s.host = self.args.host120            self._remote.all_ctrl_addr.append(Zmqlet.get_ctrl_address(s)[0])121        super().set_ready()122class RemoteMutablePod(RemotePea):123    """A RemoteMutablePod that spawns a remote :class:`MutablePod`.124    Useful in Flow API125    """...process.python-2.python
Source:process.python-2.python  
...26            return None27    def add_waiter(self, waiter):28        self.waiters.append(waiter)29    30    def set_ready(self, ready):31        self.switcher.set_ready(self, ready)32class Queue(WaitObject):33    def __init__(self):34        WaitObject.__init__(self)35        self.data = []36    def put(self, value):37        self.data.append(value)38        self.set_ready(True)39    40    def get(self):41        self.set_ready(len(self.data) > 1)42        return self.data.pop(0)43class _Ready(WaitObject):44    def __init__(self):45        WaitObject.__init__(self)46        self.set_ready(True)47# Special-casing Ready as a singleton is important for scalability48Ready = singleton(_Ready)49class Thread(object):50    def __init__(self, switcher=None):51        self.switcher = switcher or global_switcher()52        self.switcher.add_thread(self)53class Switcher(object):54    def __init__(self):55        self.threads = set()56        self.ready_objects = set()57    58    def add_thread(self, thread):59        Ready().add_waiter(thread)60        thread.runner = thread.run()61        self.threads.add(thread)62    def set_ready(self, wait_object, ready):63        if ready:64            self.ready_objects.add(wait_object)65        else:66            self.ready_objects.discard(wait_object)67    def run(self):68        while len(self.threads):69            for r in self.ready_objects:70                thread = r.get_waiter()71                if not thread or not thread.runner:72                    continue73                try:74                    wait_object = thread.runner.next()75                except StopIteration:76                    thread.runner = None...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
