Best Python code snippet using lemoncheesecake
channel_test.py
Source:channel_test.py  
...459        f3 = MethodFrame(42, 2, 3)460        c._pending_events = deque([f1, f2, 'cb', f3])461        expect(conn.send_frame).args(f1)462        expect(conn.send_frame).args(f2)463        c._flush_pending_events()464        assert_equals(deque(['cb', f3]), c._pending_events)465    def test_closed_cb_without_final_frame(self):466        c = Channel(mock(), None, self._CLASS_MAP)467        c._pending_events = 'foo'468        c._frame_buffer = 'foo'469        for val in c._class_map.values():470            expect(val._cleanup)471        expect(c._notify_close_listeners)472        c._closed_cb()473        assert_equals(deque([]), c._pending_events)474        assert_equals(deque([]), c._frame_buffer)475        assert_equals(None, c._connection)476        assert_false(hasattr(c, 'channel'))477        assert_false(hasattr(c, 'exchange'))...session.py
Source:session.py  
...62    def cursor(self, cursor):63        self._local.cursor = cursor64    def _hold_event(self, event):65        self.cursor.pending_events.append(event)66    def _flush_pending_events(self):67        for event in self.cursor.pending_events:68            self.event_manager.fire(event)69        del self.cursor.pending_events[:]70    def _discard_pending_event_if_any(self, event_class):71        if self.cursor.pending_events and isinstance(self.cursor.pending_events[-1], event_class):72            self.cursor.pending_events.pop()73            return True74        else:75            return False76    def _discard_or_fire_event(self, event_class, event):77        discarded = self._discard_pending_event_if_any(event_class)78        if not discarded:79            self.event_manager.fire(event)80    def _mark_location_as_failed(self, location):81        self._failures.add(location)82    def is_successful(self, location=None):83        if location:84            return location not in self._failures85        else:86            return len(self._failures) == 087    def start_step(self, description):88        self._end_step_if_any()89        self.cursor.step = description90        self._hold_event(91            events.StepStartEvent(self.cursor.location, description, _get_thread_id())92        )93    set_step = start_step94    def end_step(self):95        assert self.cursor.step, "There is no started step"96        self._discard_or_fire_event(97            events.StepStartEvent, events.StepEndEvent(self.cursor.location, self.cursor.step, _get_thread_id())98        )99        self.cursor.step = None100    def _end_step_if_any(self):101        if self.cursor.step:102            self.end_step()103    def _log(self, level, content):104        self._flush_pending_events()105        if level == Log.LEVEL_ERROR:106            self._mark_location_as_failed(self.cursor.location)107        self.event_manager.fire(108            events.LogEvent(self.cursor.location, self.cursor.step, _get_thread_id(), level, content)109        )110    def log_debug(self, content):111        return self._log(Log.LEVEL_DEBUG, content)112    def log_info(self, content):113        return self._log(Log.LEVEL_INFO, content)114    def log_warning(self, content):115        return self._log(Log.LEVEL_WARN, content)116    def log_error(self, content):117        return self._log(Log.LEVEL_ERROR, content)118    def log_check(self, description, is_successful, details):119        self._flush_pending_events()120        if is_successful is False:121            self._mark_location_as_failed(self.cursor.location)122        self.event_manager.fire(events.CheckEvent(123            self.cursor.location, self.cursor.step, _get_thread_id(), description, is_successful, details124        ))125    def log_url(self, url, description):126        self._flush_pending_events()127        self.event_manager.fire(128            events.LogUrlEvent(self.cursor.location, self.cursor.step, _get_thread_id(), url, description)129        )130    @contextmanager131    def prepare_attachment(self, filename, description, as_image=False):132        with self._attachment_lock:133            attachment_filename = "%04d_%s" % (self._attachment_count + 1, filename)134            self._attachment_count += 1135            if not os.path.exists(self._attachments_dir):136                os.mkdir(self._attachments_dir)137        yield os.path.join(self._attachments_dir, attachment_filename)138        self._flush_pending_events()139        self.event_manager.fire(events.LogAttachmentEvent(140            self.cursor.location, self.cursor.step, _get_thread_id(),141            "%s/%s" % (_ATTACHMENTS_DIR, attachment_filename), description, as_image142        ))143    def start_test_session(self):144        self.event_manager.fire(events.TestSessionStartEvent(self.report))145    def end_test_session(self):146        self.event_manager.fire(events.TestSessionEndEvent(self.report))147    def start_test_session_setup(self):148        self.cursor = _Cursor(ReportLocation.in_test_session_setup())149        self._hold_event(events.TestSessionSetupStartEvent())150    def end_test_session_setup(self):151        self._end_step_if_any()152        self._discard_or_fire_event(events.TestSessionSetupStartEvent, events.TestSessionSetupEndEvent())...channel.py
Source:channel.py  
...285            # Else just pass it through. Note that this situation could happen286            # on any broker-initiated message.287            if ev == cb:288                self._pending_events.popleft()289                self._flush_pending_events()290                return ev291            elif cb in self._pending_events:292                raise ChannelError(293                    "Expected synchronous callback %s, got %s", ev, cb)294        # Return the passed-in callback by default295        return cb296    def _flush_pending_events(self):297        '''298        Send pending frames that are in the event queue.299        '''300        while len(self._pending_events) and \301                isinstance(self._pending_events[0], Frame):302            self._connection.send_frame(self._pending_events.popleft())303    def _closed_cb(self, final_frame=None):304        '''305        "Private" callback from the ChannelClass when a channel is closed. Only306        called after broker initiated close, or we receive a close_ok. Caller307        has the option to send a final frame, to be used to bypass any308        synchronous or otherwise-pending frames so that the channel can be309        cleanly closed.310        '''...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
