Best Python code snippet using playwright-python
_strategy.py
Source:_strategy.py  
...225                success = False226                reason = ("apply of stage id %s not supported during an abort"227                          % stage_id)228        return success, reason229    def _abort(self, stage_id=None):230        """231        Strategy Abort232        """233        if STRATEGY_PHASE.APPLY == self._current_phase:234            if stage_id is not None:235                if not self.apply_phase.is_inprogress() or \236                        stage_id != self.apply_phase.current_stage:237                    reason = "apply not inprogress for stage id %s" % stage_id238                    return False, reason239            if self._state in [STRATEGY_STATE.APPLYING, STRATEGY_STATE.APPLY_FAILED,240                               STRATEGY_STATE.APPLY_TIMEOUT]:241                self._state = STRATEGY_STATE.ABORTING242                abort_phase = self.apply_phase.abort()243                if not abort_phase:244                    abort_phase = StrategyPhase(STRATEGY_PHASE.ABORT)245                abort_phase.strategy = self246                self._phase[STRATEGY_PHASE.ABORT] = abort_phase247                # In the case of a single stage apply, if we are not currently248                # applying anything, we need to go to the aborted state now.249                if self.apply_phase.current_stage == self.apply_phase.stop_at_stage:250                    self._state = STRATEGY_STATE.ABORTED251                    self.abort_complete(STRATEGY_RESULT.ABORTED, "")252            elif STRATEGY_STATE.APPLIED != self._state:253                self._state = STRATEGY_STATE.ABORTED254            else:255                reason = "apply not inprogress"256                return False, reason257        else:258            reason = "apply not inprogress"259            return False, reason260        return True, ''261    def _handle_event(self, event, event_data=None):262        """263        Strategy Handle Event264        """265        handled = False266        if STRATEGY_STATE.BUILDING == self._state:267            if STRATEGY_PHASE.BUILD == self._current_phase:268                handled = self.build_phase.handle_event(event, event_data)269        elif STRATEGY_STATE.APPLYING == self._state:270            if STRATEGY_PHASE.APPLY == self._current_phase:271                handled = self.apply_phase.handle_event(event, event_data)272        elif STRATEGY_STATE.ABORTING == self._state:273            if STRATEGY_PHASE.APPLY == self._current_phase:274                handled = self.apply_phase.handle_event(event, event_data)275            elif STRATEGY_PHASE.ABORT == self._current_phase:276                handled = self.abort_phase.handle_event(event, event_data)277        return handled278    def phase_save(self):279        """280        Strategy Phase Save281        """282        self.save()283    def phase_extend_timeout(self, phase):284        """285        Strategy Phase Extend Timeout286        """287        phase.refresh_timeouts()288    def phase_complete(self, phase, phase_result, phase_result_reason=None):289        """290        Strategy Phase Complete291        """292        self.save()293        result, result_reason = \294            strategy_result_update(STRATEGY_RESULT.INITIAL, '',295                                   phase_result, phase_result_reason)296        if STRATEGY_STATE.BUILDING == self._state:297            if self._phase[STRATEGY_PHASE.BUILD] == phase:298                if phase.is_success() or phase.is_degraded():299                    self._state = STRATEGY_STATE.READY_TO_APPLY300                    self.build_complete(result, result_reason)301                elif phase.is_failed():302                    self._state = STRATEGY_STATE.BUILD_FAILED303                    self.build_complete(result, result_reason)304                elif phase.is_timed_out():305                    self._state = STRATEGY_STATE.BUILD_TIMEOUT306                    self.build_complete(result, result_reason)307        elif STRATEGY_STATE.APPLYING == self._state:308            if self._phase[STRATEGY_PHASE.APPLY] == phase:309                if phase.is_success() or phase.is_degraded():310                    self._state = STRATEGY_STATE.APPLIED311                    self.apply_complete(result, result_reason)312                elif phase.is_failed():313                    self._state = STRATEGY_STATE.APPLY_FAILED314                    self.apply_complete(result, result_reason)315                    self._abort()316                    self._current_phase = STRATEGY_PHASE.ABORT317                    self.abort_phase.apply()318                elif phase.is_timed_out():319                    self._state = STRATEGY_STATE.APPLY_TIMEOUT320                    self.apply_complete(result, result_reason)321                    self._abort()322                    self._current_phase = STRATEGY_PHASE.ABORT323                    self.abort_phase.apply()324        elif STRATEGY_STATE.ABORTING == self._state:325            if self._phase[STRATEGY_PHASE.APPLY] == phase:326                if phase.is_success() or phase.is_degraded():327                    self._state = STRATEGY_STATE.APPLIED328                    self.apply_complete(result, result_reason)329                elif phase.is_failed():330                    self._state = STRATEGY_STATE.APPLY_FAILED331                    self.apply_complete(result, result_reason)332                elif phase.is_timed_out():333                    self._state = STRATEGY_STATE.APPLY_TIMEOUT334                    self.apply_complete(result, result_reason)335                self._current_phase = STRATEGY_PHASE.ABORT336                self.abort_phase.apply()337            elif self._phase[STRATEGY_PHASE.ABORT] == phase:338                if phase.is_success() or phase.is_degraded():339                    self._state = STRATEGY_STATE.ABORTED340                    self.abort_complete(result, result_reason)341                elif phase.is_failed():342                    self._state = STRATEGY_STATE.ABORT_FAILED343                    self.abort_complete(result, result_reason)344                elif phase.is_timed_out():345                    self._state = STRATEGY_STATE.ABORT_TIMEOUT346                    self.abort_complete(result, result_reason)347        self.save()348    def refresh_timeouts(self):349        """350        Strategy Refresh Timeouts351        """352        self.build_phase.refresh_timeouts()353        self.apply_phase.refresh_timeouts()354        self.abort_phase.refresh_timeouts()355    def save(self):356        """357        Strategy Save (can be overridden by child class)358        """359        pass360    def build(self):361        """362        Strategy Build (can be overridden by child class)363        """364        self._build()365        self.save()366    def build_complete(self, result, result_reason):367        """368        Strategy Build Complete (can be overridden by child class)369        """370        self.save()371        return result, result_reason372    def apply(self, stage_id):373        """374        Strategy Apply (can be overridden by child class)375        """376        success, reason = self._apply(stage_id)377        self.save()378        return success, reason379    def apply_complete(self, result, result_reason):380        """381        Strategy Apply Complete (can be overridden by child class)382        """383        self.save()384        return result, result_reason385    def abort(self, stage_id):386        """387        Strategy Abort (can be overridden by child class)388        """389        success, reason = self._abort(stage_id)390        self.save()391        return success, reason392    def abort_complete(self, result, result_reason):393        """394        Strategy Abort Complete (can be overridden by child class)395        """396        self.save()397        return result, result_reason398    def handle_event(self, event, event_data=None):399        """400        Strategy Handle Event (can be overridden by child class)401        """402        return self._handle_event(event, event_data)403    def from_dict(self, data, build_phase=None, apply_phase=None, abort_phase=None):...tasks.py
Source:tasks.py  
1from datetime import datetime2from time import sleep3from subprocess import STDOUT, Popen, PIPE4import traceback5import logbook6from logbook import FileHandler, NestedSetup7from sqlalchemy import Column, DateTime, ForeignKey, Integer, String8from sqlalchemy.ext.declarative import declared_attr9from sqlalchemy.orm import relationship, backref10from db import Base, session11from db.fields import JSONEncodedDict12from log_utils import log_filename, get_thread_handlers13from thread_utils import make_exec_threaded, thread_wait14def action_fn(action):15    def do_action(instance):16        action_start_dt = getattr(instance, '%s_start_dt' % (action,), None)17        if not action_start_dt:18            instance.call_and_record_action(action)19        else:20            raise Exception('%s already started at %s' % 21                    (action, action_start_dt))22    return do_action23class Task(Base):24    __tablename__ = 'task'25    id = Column(Integer, primary_key=True)26    type = Column(String(50), nullable=False)27    rollout_id = Column(Integer, ForeignKey('rollout.id'), nullable=False)28    parent_id = Column(Integer, ForeignKey('task.id'))29    state = Column(JSONEncodedDict(1000))30    run_start_dt = Column(DateTime)31    run_error = Column(String(500))32    run_error_dt = Column(DateTime)33    run_return = Column(String(500))34    run_return_dt = Column(DateTime)35    run_traceback = Column(String(1000))36    revert_start_dt = Column(DateTime)37    revert_error = Column(String(500))38    revert_error_dt = Column(DateTime)39    revert_return = Column(String(500))40    revert_return_dt = Column(DateTime)41    revert_traceback = Column(String(1000))42    rollout = relationship('Rollout', backref=backref('tasks', order_by=id))43    children = relationship('Task', backref=backref('parent', remote_side='Task.id', order_by=id))44    @declared_attr45    def __mapper_args__(cls):46        # Implementing single table inheritance47        if cls.__name__ == 'Task':48            return {'polymorphic_on': cls.type,}49        else:50            return {'polymorphic_identity': cls.__name__}51    def __init__(self, rollout_id, *args, **kwargs):52        self.rollout_id = rollout_id53        self.state = {}54        self._init(*args, **kwargs)55    @classmethod56    def _from_id(cls, id):57        return session.Session.query(cls).get(id)58    def _init(self, *args, **kwargs):59        pass60    @classmethod61    def _run(cls, state, children, abort, term):62        pass63    @classmethod64    def _revert(cls, state, children, abort, term):65        pass66    run = action_fn('run')67    __revert = action_fn('revert')68    def revert(self):69        if not self.run_start_dt:70            raise Exception('Cannot revert before running')71        self.__revert()72    run_threaded = make_exec_threaded('run')73    revert_threaded = make_exec_threaded('revert')74    def get_signals(self, action):75        from rollout import Rollout76        signal_action = {'run': 'rollout', 'revert': 'rollback'}[action]77        abort_signal = Rollout.get_signal(self.rollout_id, 'abort_%s' % signal_action)78        term_signal = Rollout.get_signal(self.rollout_id, 'term_%s' % signal_action)79        if not abort_signal and term_signal:80            raise Exception('Cannot run: one or more signals don\'t exist: '81                    'abort: %s, term: %s' % (abort_signal, term_signal))82        return abort_signal, term_signal83    def call_and_record_action(self, action):84        setattr(self, '%s_start_dt' % (action,), datetime.now())85        self.save()86        # action_fn should be a classmethod, hence getattr explicitly on type87        action_fn = getattr(type(self), '_%s' % (action,))88        try:89            with self.log_setup_action(action):90                abort, term = self.get_signals(action)91                action_return = action_fn(self.state, self.children, abort, term)92        except Exception, e:93            setattr(self, '%s_error' % (action,), e.message)94            setattr(self, '%s_traceback' % (action,), repr(traceback.format_exc()))95            setattr(self, '%s_error_dt' % (action,), datetime.now())96            raise97        else:98            setattr(self, '%s_return' % (action,), action_return)99            setattr(self, '%s_return_dt' % (action,), datetime.now())100        finally:101            self.save()102    def log_setup_action(self, action):103        return NestedSetup(104                get_thread_handlers() +105                (FileHandler(log_filename(106                    self.rollout_id, self.id, action), bubble=True),))107    def save(self):108        if self not in session.Session:109            session.Session.add(self)110        session.Session.commit()111    def __repr__(self):112        return '<%s: id=%s, rollout_id=%s, state=%s>' % (113                self.__class__.__name__, self.id, self.rollout_id, repr(self.state))114    def friendly_str(self):115        return repr(self)116    def friendly_html(self):117        from kettleweb.app import url_for118        inner = [self.friendly_str()]119        for action in 'run', 'revert':120            if not getattr(self, '%s_start_dt' % action):121                continue122            log_url = url_for(123                    'log_view', rollout_id=self.rollout_id, args='/'.join(124                        map(str, (self.id, action))))125            log_link = '<a href="{url}">{action}</a>'.format(126                    url=log_url, action='%s log' % action.title())127            inner.append(log_link)128        return '<span class="task {class_}">{inner}</span>'.format(129                class_=self.status(), inner=' '.join(inner))130    def status(self):131        if not self.run_start_dt:132            return 'not_started'133        else:134            if not self.revert_start_dt:135                if not self.run_return_dt:136                    return 'started'137                else:138                    return 'finished'139            else:140                if not self.revert_return_dt:141                    return 'rolling_back'142                else:143                    return 'rolled_back'144class ExecTask(Task):145    desc_string = ''146    def _init(self, children, *args, **kwargs):147        self.save()148        for child in children:149            child.parent = self150            child.save()151    @classmethod152    def _run(cls, state, children, abort, term):153        cls.exec_forwards(state, children, abort, term)154    @classmethod155    def _revert(cls, state, children, abort, term):156        cls.exec_backwards(state, children, abort, term)157    def friendly_str(self):158        return '%s%s' %\159                (type(self).desc_string, ''.join(['\n\t%s' % child.friendly_str() for child in self.children]))160    def friendly_html(self):161        return '%s<ul>%s</ul>' %\162                (type(self).desc_string,163                ''.join(['<li>%s</li>' % child.friendly_html() for child in self.children]))164class SequentialExecTask(ExecTask):165    def _init(self, children, *args, **kwargs):166        super(SequentialExecTask, self)._init(children, *args, **kwargs)167        self.state['task_order'] = [child.id for child in children]168    @classmethod169    def exec_forwards(cls, state, tasks, abort, term):170        cls.exec_tasks('run_threaded', state['task_order'], tasks, abort, term)171    @classmethod172    def exec_backwards(cls, state, tasks, abort, term):173        run_tasks = [t for t in tasks if t.run_start_dt]174        run_task_ids = set(t.id for t in run_tasks)175        task_order = [t_id for t_id in reversed(state['task_order']) if t_id in run_task_ids]176        cls.exec_tasks('revert_threaded', task_order, run_tasks, abort, term)177    @staticmethod178    def exec_tasks(method_name, task_order, tasks, abort, term):179        task_ids = {task.id: task for task in tasks}180        for task_id in task_order:181            if abort.is_set() or term.is_set():182                break183            task = task_ids.pop(task_id)184            thread = getattr(task, method_name)(abort)185            thread_wait(thread, abort)186            if thread.exc_info is not None:187                raise Exception('Caught exception while executing task %s: %s' %188                        (task, thread.exc_info))189        else:190            assert not task_ids, ("SequentialExecTask has children that are not "191                    "in its task_order: %s" % (task_ids,))192    def friendly_html(self):193        task_order = self.state['task_order']194        child_ids = {c.id: c for c in self.children}195        return '%s<ul>%s</ul>' %\196                (type(self).desc_string,197                ''.join(['<li>%s</li>' % child_ids[id].friendly_html() for id in task_order]))198class ParallelExecTask(ExecTask):199    desc_string = 'Execute in parallel:'200    @classmethod201    def exec_forwards(cls, state, tasks, abort, term):202        cls.exec_tasks('run_threaded', tasks, abort, term)203    @classmethod204    def exec_backwards(cls, state, tasks, abort, term):205        cls.exec_tasks('revert_threaded', [t for t in tasks if t.run_start_dt],206                abort, term)207    @staticmethod208    def exec_tasks(method_name, tasks, abort, term):209        threads = []210        for task in tasks:211            if abort.is_set() or term.is_set():212                break213            thread = getattr(task, method_name)(abort)214            threads.append(thread)215        for thread in threads:216            thread_wait(thread, abort)217            if thread.exc_info is not None:218                raise Exception('Caught exception while executing task %s: %s' %219                        (thread.target, thread.exc_info))220class DelayTask(Task):221    def _init(self, minutes=0, seconds=0, reversible=False):222        self.state.update(223                minutes=minutes,224                seconds=seconds,225                reversible=reversible)226    @classmethod227    def _run(cls, state, children, abort, term):228        cls.wait(cls.get_secs(state), abort=abort, term=term)229    @classmethod230    def _revert(cls, state, children, abort, term):231        if state['reversible']:232            cls.wait(cls.get_secs(state), abort=abort, term=term)233    @classmethod234    def wait(cls, secs, abort, term):235        logbook.info('Waiting for %s' % (cls.min_sec_str(secs),),)236        for i in xrange(int(secs)):237            if abort.is_set() or term.is_set():238                break239            sleep(1)240    def friendly_str(self):241        delay_secs = self.get_secs(self.state)242        delay_str = self.min_sec_str(delay_secs)243        if self.run_start_dt and not self.run_return_dt:244            remaining_secs = delay_secs - (datetime.now() - self.run_start_dt).seconds245            remaining_str = ' (%s left)' % self.min_sec_str(remaining_secs)246        else:247            remaining_str = ''248        rev_str = ' (reversible)' if self.state['reversible'] else ''249        return 'Delay for {delay_str}{remaining_str}{rev_str}'.format(250                delay_str=delay_str, remaining_str=remaining_str, rev_str=rev_str)251    @staticmethod252    def min_sec_str(secs):253        mins = secs / 60254        secs = secs % 60255        if mins:256            return '%d:%02d mins' % (mins, secs)257        else:258            return '%d secs' % (secs,)259    @staticmethod260    def get_secs(state):261        return (state.get('minutes', 0) * 60) + state.get('seconds', 0)262def subprocess_run(command, abort, term, log=True, **kwargs):263    if log:264        logbook.info(command)265    p = Popen(266            args=command,267            stdout=PIPE,268            stderr=STDOUT,269            **kwargs270            )271    outputs = []272    for line in iter(p.stdout.readline, ''):273        if term and term.is_set():274            term = None # Don't spam calls to terminate275            p.terminate()276            logbook.info('Caught TERM signal: stopping')277        line = line.strip()278        outputs.append(line)279        if log:280            logbook.info(line)281    returncode = p.wait()282    if returncode == 0:283        return outputs284    else:285        if abort:286            abort.set()287        exc = Exception(command, outputs, returncode)...abort.py
Source:abort.py  
1"""2Test the server sasl aborting at different stages3"""4from servicetest import EventPattern5from gabbletest import exec_test, call_async6import constants as cs7from saslutil import SaslEventAuthenticator, connect_and_get_sasl_channel, \8    abort_auth9JID = "test@example.org"10PASSWORD = "pass"11EXCHANGE = [("", "remote challenge"),12            ("Another step", "Here we go"),13            ("local response", "")]14MECHANISMS = ["PLAIN", "DIGEST-MD5", "ABORT-TEST"]15def test_abort_early(q, bus, conn, stream):16    chan, props = connect_and_get_sasl_channel(q, bus, conn)17    abort_auth(q, chan, 31337, "maybe if I use an undefined code you'll crash")18def start_mechanism(q, bus, conn,19                    mechanism="ABORT-TEST", initial_response=EXCHANGE[0][1]):20    chan, props = connect_and_get_sasl_channel(q, bus, conn)21    chan.SASLAuthentication.StartMechanismWithData(mechanism, initial_response)22    q.expect('dbus-signal', signal='SASLStatusChanged',23             interface=cs.CHANNEL_IFACE_SASL_AUTH,24             args=[cs.SASL_STATUS_IN_PROGRESS, '', {}])25    e = q.expect('sasl-auth', initial_response=initial_response)26    return chan, e.authenticator27def test_abort_mid(q, bus, conn, stream):28    chan, authenticator = start_mechanism(q, bus, conn)29    authenticator.challenge(EXCHANGE[1][0])30    q.expect('dbus-signal', signal='NewChallenge',31                 interface=cs.CHANNEL_IFACE_SASL_AUTH,32                 args=[EXCHANGE[1][0]])33    abort_auth(q, chan, cs.SASL_ABORT_REASON_INVALID_CHALLENGE,34               "wrong data from server")35def test_disconnect_mid(q, bus, conn, stream):36    chan, authenticator = start_mechanism(q, bus, conn)37    authenticator.challenge(EXCHANGE[1][0])38    q.expect('dbus-signal', signal='NewChallenge',39                 interface=cs.CHANNEL_IFACE_SASL_AUTH,40                 args=[EXCHANGE[1][0]])41    call_async(q, conn, 'Disconnect')42    q.expect_many(EventPattern('dbus-signal', signal='StatusChanged',43                               args=[cs.CONN_STATUS_DISCONNECTED,44                                     cs.CSR_REQUESTED]),45                  EventPattern('dbus-return', method='Disconnect'))46def test_abort_connected(q, bus, conn, stream):47    initial_response = '\0' + JID.split('@')[0] + '\0' + PASSWORD48    chan, authenticator = start_mechanism(q, bus, conn,49        mechanism='PLAIN',50        initial_response=initial_response)51    authenticator.success(None)52    q.expect('dbus-signal', signal='SASLStatusChanged',53             interface=cs.CHANNEL_IFACE_SASL_AUTH,54             args=[cs.SASL_STATUS_SERVER_SUCCEEDED, '', {}])55    chan.SASLAuthentication.AcceptSASL()56    q.expect('dbus-signal', signal='SASLStatusChanged',57             interface=cs.CHANNEL_IFACE_SASL_AUTH,58             args=[cs.SASL_STATUS_SUCCEEDED, '', {}])59    q.expect('dbus-signal', signal='StatusChanged',60             args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])61    call_async(q, chan.SASLAuthentication, 'AbortSASL',62            cs.SASL_ABORT_REASON_USER_ABORT, "aborting too late")63    q.expect('dbus-error', method='AbortSASL', name=cs.NOT_AVAILABLE)64    chan.Close()65def test_give_up_while_waiting(q, bus, conn, stream,66                               channel_method,67                               authenticator_method):68    """Regression test for https://bugs.freedesktop.org/show_bug.cgi?id=5214669    where closing the auth channel while waiting for a challenge from the70    server would not abort the SASL exchange, and the challenge subsequently71    arriving would make Gabble assert.72    """73    chan, authenticator = start_mechanism(q, bus, conn)74    # While Gabble is waiting for waiting for the server to make the next move,75    # a Telepathy client does something to try to end the authentication76    # process.77    channel_method(chan)78    # And then we hear something back from the server.79    authenticator_method(authenticator)80    # FIXME: Gabble should probably send <abort/> and wait for the server to81    # say <failure><aborted/> rather than unceremoniously closing the82    # connection.83    #84    # In the bug we're testing for, the stream connection would indeed be lost,85    # but Gabble would also crash and leave a core dump behind. So this test86    # would appear to pass, but 'make check' would fail as we want.87    q.expect_many(88        EventPattern('stream-connection-lost'),89        EventPattern('dbus-signal', signal='ConnectionError'),90        EventPattern(91            'dbus-signal', signal="StatusChanged",92            args=[cs.CONN_STATUS_DISCONNECTED,93                  cs.CSR_AUTHENTICATION_FAILED]),94        )95def test_close_then_challenge(q, bus, conn, stream):96    """This is the specific scenario for which 52146 was reported. The channel97    was being closed because its handler crashed.98    """99    test_give_up_while_waiting(q, bus, conn, stream,100        lambda chan: chan.Close(),101        lambda authenticator: authenticator.challenge(EXCHANGE[1][0]))102def test_close_then_success(q, bus, conn, stream):103    test_give_up_while_waiting(q, bus, conn, stream,104        lambda chan: chan.Close(),105        lambda authenticator: authenticator.success())106def test_close_then_failure(q, bus, conn, stream):107    test_give_up_while_waiting(q, bus, conn, stream,108        lambda chan: chan.Close(),109        lambda authenticator: authenticator.not_authorized())110def test_abort_then_challenge(q, bus, conn, stream):111    test_give_up_while_waiting(q, bus, conn, stream,112        lambda chan: chan.SASLAuthentication.AbortSASL(113            cs.SASL_ABORT_REASON_USER_ABORT, "bored now"),114        lambda authenticator: authenticator.challenge(EXCHANGE[1][0]))115def test_abort_then_success(q, bus, conn, stream):116    """FIXME: this test fails because the channel changes its state from117    TP_SASL_STATUS_CLIENT_FAILED to TP_SASL_STATUS_SERVER_SUCCEEDED and waits118    for the client to ack it when <success> arrives, which is dumb.119    """120    test_give_up_while_waiting(q, bus, conn, stream,121        lambda chan: chan.SASLAuthentication.AbortSASL(122            cs.SASL_ABORT_REASON_USER_ABORT, "bored now"),123        lambda authenticator: authenticator.success())124def test_abort_then_failure(q, bus, conn, stream):125    test_give_up_while_waiting(q, bus, conn, stream,126        lambda chan: chan.SASLAuthentication.AbortSASL(127            cs.SASL_ABORT_REASON_USER_ABORT, "bored now"),128        lambda authenticator: authenticator.not_authorized())129def abort_and_close(chan):130    chan.SASLAuthentication.AbortSASL(131        cs.SASL_ABORT_REASON_USER_ABORT, "bored now")132    chan.Close()133def test_abort_and_close_then_challenge(q, bus, conn, stream):134    test_give_up_while_waiting(q, bus, conn, stream,135        abort_and_close,136        lambda authenticator: authenticator.challenge(EXCHANGE[1][0]))137def test_abort_and_close_then_failure(q, bus, conn, stream):138    test_give_up_while_waiting(q, bus, conn, stream,139        abort_and_close,140        lambda authenticator: authenticator.not_authorized())141def exec_test_(func):142    # Can't use functools.partial, because the authenticator is stateful.143    authenticator = SaslEventAuthenticator(JID.split('@')[0], MECHANISMS)144    exec_test(func, do_connect=False, authenticator=authenticator,145        params={'password': None,146                'account' : JID,147               })148if __name__ == '__main__':149    exec_test_(test_abort_early)150    exec_test_(test_abort_mid)151    exec_test_(test_disconnect_mid)152    exec_test_(test_abort_connected)153    exec_test_(test_close_then_challenge)154    exec_test_(test_close_then_success)155    exec_test_(test_close_then_failure)156    exec_test_(test_abort_then_challenge)157    # exec_test_(test_abort_then_success)158    exec_test_(test_abort_then_failure)159    exec_test_(test_abort_and_close_then_challenge)...misc_test.py
Source:misc_test.py  
...3import webob.exc4import webapp25import test_base6class TestMiscellaneous(test_base.BaseTestCase):7    def test_abort(self):8        self.assertRaises(webob.exc.HTTPOk, webapp2.abort, 200)9        self.assertRaises(webob.exc.HTTPCreated, webapp2.abort, 201)10        self.assertRaises(webob.exc.HTTPAccepted, webapp2.abort, 202)11        self.assertRaises(webob.exc.HTTPNonAuthoritativeInformation, webapp2.abort, 203)12        self.assertRaises(webob.exc.HTTPNoContent, webapp2.abort, 204)13        self.assertRaises(webob.exc.HTTPResetContent, webapp2.abort, 205)14        self.assertRaises(webob.exc.HTTPPartialContent, webapp2.abort, 206)15        self.assertRaises(webob.exc.HTTPMultipleChoices, webapp2.abort, 300)16        self.assertRaises(webob.exc.HTTPMovedPermanently, webapp2.abort, 301)17        self.assertRaises(webob.exc.HTTPFound, webapp2.abort, 302)18        self.assertRaises(webob.exc.HTTPSeeOther, webapp2.abort, 303)19        self.assertRaises(webob.exc.HTTPNotModified, webapp2.abort, 304)20        self.assertRaises(webob.exc.HTTPUseProxy, webapp2.abort, 305)21        self.assertRaises(webob.exc.HTTPTemporaryRedirect, webapp2.abort, 307)...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
