Best Python code snippet using playwright-python
evaluation.py
Source:evaluation.py  
...470            else get_i18n_string("judge.evaluation.dynamic")471        )472    elif isinstance(test, ExitCodeOutputChannel):473        return str(test.value)474def _add_channel(475    bundle: Bundle, output: OutputChannel, channel: Channel, updates: List[Update]476):477    """Add a channel to the output if it should be shown."""478    if should_show(output, channel):479        updates.append(480            StartTest(expected=guess_expected_value(bundle, output), channel=channel)481        )482        updates.append(483            CloseTest(484                generated="",485                status=StatusMessage(486                    enum=Status.NOT_EXECUTED,487                    human=get_i18n_string("judge.evaluation.missing.test"),488                ),489                accepted=False,490            )491        )492def prepare_evaluation(bundle: Bundle, collector: OutputManager):493    """494    Generate output depicting the expected testplan. This output will be shown if495    the normal execution terminates early for some reason. This function assumes496    the output is OK, but does not accept anything.497    :param bundle: The configuration bundle.498    :param collector: The output collector.499    """500    collector.prepare_judgment(StartJudgment())501    for i, tab in enumerate(bundle.plan.tabs):502        collector.prepare_tab(StartTab(title=tab.name, hidden=tab.hidden), i)503        context_index = 0504        for run in tab.runs:505            run_testcase = run.run506            has_main = run_testcase.input.main_call507            # Get exit code stuff, but don't evaluate yet.508            exit_output = run_testcase.output.exit_code509            if has_main:510                collector.prepare_context(511                    StartContext(description=run_testcase.description), i, context_index512                )513                readable_input, inlined_files = get_readable_input(514                    bundle, run_testcase.link_files, run_testcase515                )516                # Add file links517                non_inlined = set(run_testcase.link_files).difference(inlined_files)518                if non_inlined:519                    _link_files_message(non_inlined, collector)520                updates = [521                    AppendMessage(522                        message=get_i18n_string("judge.evaluation.missing.context")523                    ),524                    StartTestcase(description=readable_input),525                ]526                # Do the normal output channels.527                output = run_testcase.output528                _add_channel(bundle, output.stdout, Channel.STDOUT, updates)529                _add_channel(bundle, output.stderr, Channel.STDERR, updates)530                _add_channel(bundle, output.file, Channel.FILE, updates)531                _add_channel(bundle, output.exception, Channel.EXCEPTION, updates)532                # If this is the last testcase, do the exit code.533                if not run.contexts:534                    _add_channel(bundle, exit_output, Channel.EXIT, updates)535                updates.append(CloseTestcase(accepted=False))536                collector.prepare_context(updates, i, context_index)537                collector.prepare_context(538                    CloseContext(accepted=False), i, context_index539                )540                context_index += 1541            for j, context in enumerate(run.contexts, start=int(has_main)):542                updates = []543                collector.prepare_context(544                    StartContext(description=context.description), i, context_index545                )546                updates.append(547                    AppendMessage(548                        message=get_i18n_string("judge.evaluation.missing.context")549                    )550                )551                inlined_files: Set[FileUrl] = set()552                # Begin normal testcases.553                for t, testcase in enumerate(context.testcases, 1):554                    readable_input, seen = get_readable_input(555                        bundle, context.link_files, testcase556                    )557                    inlined_files = inlined_files.union(seen)558                    updates.append(StartTestcase(description=readable_input))559                    # Do the normal output channels.560                    output = testcase.output561                    _add_channel(bundle, output.stdout, Channel.STDOUT, updates)562                    _add_channel(bundle, output.stderr, Channel.STDERR, updates)563                    _add_channel(bundle, output.file, Channel.FILE, updates)564                    _add_channel(bundle, output.exception, Channel.EXCEPTION, updates)565                    _add_channel(bundle, output.result, Channel.RETURN, updates)566                    # If last testcase, do exit code.567                    if j == int(has_main) + len(run.contexts) - 1 and t == len(568                        context.testcases569                    ):570                        _add_channel(bundle, exit_output, Channel.EXIT, updates)571                    updates.append(CloseTestcase(accepted=False))572                # Add file links573                non_inlined = set(context.link_files).difference(inlined_files)574                if non_inlined:575                    updates.insert(0, _link_files_message(non_inlined))576                collector.prepare_context(updates, i, context_index)577                collector.prepare_context(578                    CloseContext(accepted=False), i, context_index579                )580                context_index += 1581        collector.prepare_tab(CloseTab(), i)...client.py
Source:client.py  
...130    # No target time, as no work to do131    self.target_time = 0132    self.work_items = [ ]133    for url in urls:134      self._add_channel(url)135  def _reconnect(self, url, event_name, event_arg):136    if event_name == 'closed' or event_name == 'error':137      # Stupid connection closed for some reason. Set up a reconnect. Note138      # that it should have been removed from asyncore.socket_map already.139      self._reconnect_later(url)140    # Call the user's callback now.141    self.event_callback(url, event_name, event_arg)142  def _reconnect_later(self, url):143    # Set up a work item to reconnect in a little while.144    self.work_items.append(url)145    # Only set a target if one has not been set yet. Otherwise, we could146    # create a race condition of continually moving out towards the future147    if not self.target_time:148      self.target_time = time.time() + RECONNECT_DELAY149  def _add_channel(self, url):150    # Simply instantiating the client will install it into the global map151    # for processing in the main event loop.152    Client(url,153           functools.partial(self.commit_callback, url),154           functools.partial(self._reconnect, url))155  def _check_stale(self):156    now = time.time()157    for client in asyncore.socket_map.values():158      if client.last_activity + STALE_DELAY < now:159        # Whoops. No activity in a while. Signal this fact, Close the160        # Client, then have it reconnected later on.161        self.event_callback(client.url, 'stale', client.last_activity)162        # This should remove it from .socket_map.163        client.close()164        self._reconnect_later(client.url)165  def _maybe_work(self):166    # If we haven't reach the targetted time, or have no work to do,167    # then fast-path exit168    if time.time() < self.target_time or not self.work_items:169      return170    # We'll take care of all the work items, so no target for future work171    self.target_time = 0172    # Play a little dance just in case work gets added while we're173    # currently working on stuff174    work = self.work_items175    self.work_items = [ ]176    for url in work:177      self._add_channel(url)178  def run_forever(self):179    while True:180      if asyncore.socket_map:181        asyncore.loop(timeout=TIMEOUT, count=1)182      else:183        time.sleep(TIMEOUT)184      self._check_stale()...asyncapi.py
Source:asyncapi.py  
...99        )100        self.channels = []101        self.messages = []102        self.name = "interface"103    def _add_channel(self, channel: Channel):104        self.channels.append(channel)105    def _add_message(self, message: Message):106        self.messages.append(message)107    def _set_interface_name(self, name):108        self.name = name109        self.asyncapi["id"] = f"urn:stingeripc:{name}"110    def _set_interface_version(self, version_str):111        self.asyncapi["info"]["version"] = version_str112        message = Message("stinger_version", {"type": "string"})113        channel = Channel(114            topic=f"{self.name}/stingerVersion",115            name="stinger_version",116            direction=Direction.SERVER_PUBLISHES,117            message_name=message.name,118        )119        channel.set_mqtt(1, True)120        self._add_channel(channel)121        self._add_message(message)122    def _add_signal(self, signal_name, signal_def):123        msg_name = f"{signal_name}Signal"124        msg = Message(msg_name).set_reference(signal_def["payload"])125        self._add_message(msg)126        channel = Channel(127            f"{self.name}/{signal_name}",128            signal_name,129            Direction.SERVER_PUBLISHES,130            msg.name,131        )132        channel.set_mqtt(2, False)133        self._add_channel(channel)134    def _add_param(self, param_name, param_def):135        msg_name = "{}Param".format(stringmanip.upper_camel_case(param_name))136        msg = Message(msg_name).set_reference(param_def["type"])137        self._add_message(msg)138        value_channel_topic = f"{self.name}/{param_name}/value"139        value_channel_name = f"{param_name}Value"140        value_channel = Channel(141            value_channel_topic,142            value_channel_name,143            Direction.SERVER_PUBLISHES,144            msg.name,145        )146        value_channel.set_mqtt(1, True)147        self._add_channel(value_channel)148        update_channel_topic = f"{self.name}/{param_name}/update"149        update_channel_name = f"Update{param_name}"150        update_channel = Channel(151            update_channel_topic,152            update_channel_name,153            Direction.SERVER_SUBSCRIBES,154            msg.name,155        )156        update_channel.set_mqtt(1, True)157        self._add_channel(update_channel)158    def add_stinger(self, stinger):159        assert stinger["stingeripc"] == "0.0.1"160        try:161            self._set_interface_name(stinger["interface"]["name"])162        except KeyError:163            raise Exception("The Stinger File does not have a name")164        self._set_interface_version(stinger["interface"]["version"])165        if "signals" in stinger:166            for sig_name, sig_def in stinger["signals"].items():167                self._add_signal(sig_name, sig_def)168        if "params" in stinger:169            for param_name, param_def in stinger["params"].items():170                self._add_param(param_name, param_def)171    def get_asyncapi(self, client_type: SpecType, use_common=None):...test_filesystem.py
Source:test_filesystem.py  
...41            try:42                channel._qos._delivered.clear()43            except AttributeError:44                pass45    def _add_channel(self, channel):46        self.channels.add(channel)47        return channel48    def test_produce_consume_noack(self):49        producer = Producer(self._add_channel(self.p.channel()), self.e)50        consumer = Consumer(self._add_channel(self.c.channel()), self.q,51                            no_ack=True)52        for i in range(10):53            producer.publish({'foo': i},54                             routing_key='test_transport_filesystem')55        _received = []56        def callback(message_data, message):57            _received.append(message)58        consumer.register_callback(callback)59        consumer.consume()60        while 1:61            if len(_received) == 10:62                break63            self.c.drain_events()64        assert len(_received) == 1065    def test_produce_consume(self):66        producer_channel = self._add_channel(self.p.channel())67        consumer_channel = self._add_channel(self.c.channel())68        producer = Producer(producer_channel, self.e)69        consumer1 = Consumer(consumer_channel, self.q)70        consumer2 = Consumer(consumer_channel, self.q2)71        self.q2(consumer_channel).declare()72        for i in range(10):73            producer.publish({'foo': i},74                             routing_key='test_transport_filesystem')75        for i in range(10):76            producer.publish({'foo': i},77                             routing_key='test_transport_filesystem2')78        _received1 = []79        _received2 = []80        def callback1(message_data, message):81            _received1.append(message)...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
