Best Python code snippet using localstack_python
assertsql.py
Source:assertsql.py  
1# testing/assertsql.py2# Copyright (C) 2005-2020 the SQLAlchemy authors and contributors3# <see AUTHORS file>4#5# This module is part of SQLAlchemy and is released under6# the MIT License: http://www.opensource.org/licenses/mit-license.php7import collections8import contextlib9import re10from .. import event11from .. import util12from ..engine import url13from ..engine.default import DefaultDialect14from ..engine.util import _distill_params15from ..schema import _DDLCompiles16class AssertRule(object):17    is_consumed = False18    errormessage = None19    consume_statement = True20    def process_statement(self, execute_observed):21        pass22    def no_more_statements(self):23        assert False, (24            "All statements are complete, but pending "25            "assertion rules remain"26        )27class SQLMatchRule(AssertRule):28    pass29class CursorSQL(SQLMatchRule):30    consume_statement = False31    def __init__(self, statement, params=None):32        self.statement = statement33        self.params = params34    def process_statement(self, execute_observed):35        stmt = execute_observed.statements[0]36        if self.statement != stmt.statement or (37            self.params is not None and self.params != stmt.parameters38        ):39            self.errormessage = (40                "Testing for exact SQL %s parameters %s received %s %s"41                % (42                    self.statement,43                    self.params,44                    stmt.statement,45                    stmt.parameters,46                )47            )48        else:49            execute_observed.statements.pop(0)50            self.is_consumed = True51            if not execute_observed.statements:52                self.consume_statement = True53class CompiledSQL(SQLMatchRule):54    def __init__(self, statement, params=None, dialect="default"):55        self.statement = statement56        self.params = params57        self.dialect = dialect58    def _compare_sql(self, execute_observed, received_statement):59        stmt = re.sub(r"[\n\t]", "", self.statement)60        return received_statement == stmt61    def _compile_dialect(self, execute_observed):62        if self.dialect == "default":63            return DefaultDialect()64        else:65            # ugh66            if self.dialect == "postgresql":67                params = {"implicit_returning": True}68            else:69                params = {}70            return url.URL(self.dialect).get_dialect()(**params)71    def _received_statement(self, execute_observed):72        """reconstruct the statement and params in terms73        of a target dialect, which for CompiledSQL is just DefaultDialect."""74        context = execute_observed.context75        compare_dialect = self._compile_dialect(execute_observed)76        if isinstance(context.compiled.statement, _DDLCompiles):77            compiled = context.compiled.statement.compile(78                dialect=compare_dialect,79                schema_translate_map=context.execution_options.get(80                    "schema_translate_map"81                ),82            )83        else:84            compiled = context.compiled.statement.compile(85                dialect=compare_dialect,86                column_keys=context.compiled.column_keys,87                inline=context.compiled.inline,88                schema_translate_map=context.execution_options.get(89                    "schema_translate_map"90                ),91            )92        _received_statement = re.sub(r"[\n\t]", "", util.text_type(compiled))93        parameters = execute_observed.parameters94        if not parameters:95            _received_parameters = [compiled.construct_params()]96        else:97            _received_parameters = [98                compiled.construct_params(m) for m in parameters99            ]100        return _received_statement, _received_parameters101    def process_statement(self, execute_observed):102        context = execute_observed.context103        _received_statement, _received_parameters = self._received_statement(104            execute_observed105        )106        params = self._all_params(context)107        equivalent = self._compare_sql(execute_observed, _received_statement)108        if equivalent:109            if params is not None:110                all_params = list(params)111                all_received = list(_received_parameters)112                while all_params and all_received:113                    param = dict(all_params.pop(0))114                    for idx, received in enumerate(list(all_received)):115                        # do a positive compare only116                        for param_key in param:117                            # a key in param did not match current118                            # 'received'119                            if (120                                param_key not in received121                                or received[param_key] != param[param_key]122                            ):123                                break124                        else:125                            # all keys in param matched 'received';126                            # onto next param127                            del all_received[idx]128                            break129                    else:130                        # param did not match any entry131                        # in all_received132                        equivalent = False133                        break134                if all_params or all_received:135                    equivalent = False136        if equivalent:137            self.is_consumed = True138            self.errormessage = None139        else:140            self.errormessage = self._failure_message(params) % {141                "received_statement": _received_statement,142                "received_parameters": _received_parameters,143            }144    def _all_params(self, context):145        if self.params:146            if util.callable(self.params):147                params = self.params(context)148            else:149                params = self.params150            if not isinstance(params, list):151                params = [params]152            return params153        else:154            return None155    def _failure_message(self, expected_params):156        return (157            "Testing for compiled statement %r partial params %s, "158            "received %%(received_statement)r with params "159            "%%(received_parameters)r"160            % (161                self.statement.replace("%", "%%"),162                repr(expected_params).replace("%", "%%"),163            )164        )165class RegexSQL(CompiledSQL):166    def __init__(self, regex, params=None):167        SQLMatchRule.__init__(self)168        self.regex = re.compile(regex)169        self.orig_regex = regex170        self.params = params171        self.dialect = "default"172    def _failure_message(self, expected_params):173        return (174            "Testing for compiled statement ~%r partial params %s, "175            "received %%(received_statement)r with params "176            "%%(received_parameters)r"177            % (178                self.orig_regex.replace("%", "%%"),179                repr(expected_params).replace("%", "%%"),180            )181        )182    def _compare_sql(self, execute_observed, received_statement):183        return bool(self.regex.match(received_statement))184class DialectSQL(CompiledSQL):185    def _compile_dialect(self, execute_observed):186        return execute_observed.context.dialect187    def _compare_no_space(self, real_stmt, received_stmt):188        stmt = re.sub(r"[\n\t]", "", real_stmt)189        return received_stmt == stmt190    def _received_statement(self, execute_observed):191        received_stmt, received_params = super(192            DialectSQL, self193        )._received_statement(execute_observed)194        # TODO: why do we need this part?195        for real_stmt in execute_observed.statements:196            if self._compare_no_space(real_stmt.statement, received_stmt):197                break198        else:199            raise AssertionError(200                "Can't locate compiled statement %r in list of "201                "statements actually invoked" % received_stmt202            )203        return received_stmt, execute_observed.context.compiled_parameters204    def _compare_sql(self, execute_observed, received_statement):205        stmt = re.sub(r"[\n\t]", "", self.statement)206        # convert our comparison statement to have the207        # paramstyle of the received208        paramstyle = execute_observed.context.dialect.paramstyle209        if paramstyle == "pyformat":210            stmt = re.sub(r":([\w_]+)", r"%(\1)s", stmt)211        else:212            # positional params213            repl = None214            if paramstyle == "qmark":215                repl = "?"216            elif paramstyle == "format":217                repl = r"%s"218            elif paramstyle == "numeric":219                repl = None220            stmt = re.sub(r":([\w_]+)", repl, stmt)221        return received_statement == stmt222class CountStatements(AssertRule):223    def __init__(self, count):224        self.count = count225        self._statement_count = 0226    def process_statement(self, execute_observed):227        self._statement_count += 1228    def no_more_statements(self):229        if self.count != self._statement_count:230            assert False, "desired statement count %d does not match %d" % (231                self.count,232                self._statement_count,233            )234class AllOf(AssertRule):235    def __init__(self, *rules):236        self.rules = set(rules)237    def process_statement(self, execute_observed):238        for rule in list(self.rules):239            rule.errormessage = None240            rule.process_statement(execute_observed)241            if rule.is_consumed:242                self.rules.discard(rule)243                if not self.rules:244                    self.is_consumed = True245                break246            elif not rule.errormessage:247                # rule is not done yet248                self.errormessage = None249                break250        else:251            self.errormessage = list(self.rules)[0].errormessage252class EachOf(AssertRule):253    def __init__(self, *rules):254        self.rules = list(rules)255    def process_statement(self, execute_observed):256        while self.rules:257            rule = self.rules[0]258            rule.process_statement(execute_observed)259            if rule.is_consumed:260                self.rules.pop(0)261            elif rule.errormessage:262                self.errormessage = rule.errormessage263            if rule.consume_statement:264                break265        if not self.rules:266            self.is_consumed = True267    def no_more_statements(self):268        if self.rules and not self.rules[0].is_consumed:269            self.rules[0].no_more_statements()270        elif self.rules:271            super(EachOf, self).no_more_statements()272class Or(AllOf):273    def process_statement(self, execute_observed):274        for rule in self.rules:275            rule.process_statement(execute_observed)276            if rule.is_consumed:277                self.is_consumed = True278                break279        else:280            self.errormessage = list(self.rules)[0].errormessage281class SQLExecuteObserved(object):282    def __init__(self, context, clauseelement, multiparams, params):283        self.context = context284        self.clauseelement = clauseelement285        self.parameters = _distill_params(multiparams, params)286        self.statements = []287class SQLCursorExecuteObserved(288    collections.namedtuple(289        "SQLCursorExecuteObserved",290        ["statement", "parameters", "context", "executemany"],291    )292):293    pass294class SQLAsserter(object):295    def __init__(self):296        self.accumulated = []297    def _close(self):298        self._final = self.accumulated299        del self.accumulated300    def assert_(self, *rules):301        rule = EachOf(*rules)302        observed = list(self._final)303        while observed:304            statement = observed.pop(0)305            rule.process_statement(statement)306            if rule.is_consumed:307                break308            elif rule.errormessage:309                assert False, rule.errormessage310        if observed:311            assert False, "Additional SQL statements remain"312        elif not rule.is_consumed:313            rule.no_more_statements()314@contextlib.contextmanager315def assert_engine(engine):316    asserter = SQLAsserter()317    orig = []318    @event.listens_for(engine, "before_execute")319    def connection_execute(conn, clauseelement, multiparams, params):320        # grab the original statement + params before any cursor321        # execution322        orig[:] = clauseelement, multiparams, params323    @event.listens_for(engine, "after_cursor_execute")324    def cursor_execute(325        conn, cursor, statement, parameters, context, executemany326    ):327        if not context:328            return329        # then grab real cursor statements and associate them all330        # around a single context331        if (332            asserter.accumulated333            and asserter.accumulated[-1].context is context334        ):335            obs = asserter.accumulated[-1]336        else:337            obs = SQLExecuteObserved(context, orig[0], orig[1], orig[2])338            asserter.accumulated.append(obs)339        obs.statements.append(340            SQLCursorExecuteObserved(341                statement, parameters, context, executemany342            )343        )344    try:345        yield asserter346    finally:347        event.remove(engine, "after_cursor_execute", cursor_execute)348        event.remove(engine, "before_execute", connection_execute)...switch_impl_test.py
Source:switch_impl_test.py  
1#!/usr/bin/env python2import unittest3import sys4import os.path5from copy import copy6sys.path.append(os.path.dirname(__file__) + "/../../..")7from pox.openflow.libopenflow_01 import *8from pox.openflow.switch_impl import *9class MockConnection(object):10  def __init__(self):11    self.ofp_handlers = {}12    self.received = []13  @property14  def last(self):15    return self.received[-1]16  def to_switch(self, msg):17    self.ofp_handlers[msg.header_type](msg)18  # from switch19  def send(self, msg):20    self.received.append(msg)21class SwitchImplTest(unittest.TestCase):22  def setUp(self):23    self.conn = MockConnection()24    self.switch = SwitchImpl(1, name="sw1")25    self.switch.set_connection(self.conn)26    self.packet = ethernet(src=EthAddr("00:00:00:00:00:01"), dst=EthAddr("00:00:00:00:00:02"),27            payload=ipv4(srcip=IPAddr("1.2.3.4"), dstip=IPAddr("1.2.3.5"),28                payload=udp(srcport=1234, dstport=53, payload="haha")))29  def test_hello(self):30    c = self.conn31    c.to_switch(ofp_hello(xid=123))32    self.assertEqual(len(c.received), 1)33    self.assertTrue(isinstance(c.last, ofp_hello),34          "should have received hello but got %s" % c.last)35  def test_echo_request(self):36    c = self.conn37    c.to_switch(ofp_echo_request(xid=123))38    self.assertEqual(len(c.received), 1)39    self.assertTrue(isinstance(c.last, ofp_echo_reply) and c.last.xid == 123,40          "should have received echo reply but got %s" % c.last)41  def test_barrier(self):42    c = self.conn43    c.to_switch(ofp_barrier_request(xid=123))44    self.assertEqual(len(c.received), 1)45    self.assertTrue(isinstance(c.last, ofp_barrier_reply) and c.last.xid == 123,46          "should have received echo reply but got %s" % c.last)47  def test_flow_mod(self):48    c = self.conn49    s = self.switch50    c.to_switch(ofp_flow_mod(xid=124, priority=1, match=ofp_match(in_port=1, nw_src="1.2.3.4")))51    self.assertEqual(len(c.received), 0)52    self.assertEqual(len(s.table), 1)53    e = s.table.entries[0]54    self.assertEqual(e.priority,1)55    self.assertEqual(e.match, ofp_match(in_port=1, nw_src="1.2.3.4"))56  def test_packet_out(self):57    c = self.conn58    s = self.switch59    received = []60    s.addListener(DpPacketOut, lambda(event): received.append(event))61    packet = self.packet62    c.to_switch(ofp_packet_out(data=packet, actions=[ofp_action_output(port=2)]))63    self.assertEqual(len(c.received), 0)64    self.assertEqual(len(received), 1)65    event = received[0]66    self.assertEqual(event.port.port_no,2)67    self.assertEqual(event.packet.pack(), packet.pack())68  def test_send_packet_in(self):69    c = self.conn70    s = self.switch71    s.send_packet_in(in_port=1, buffer_id=123, packet=self.packet, xid=314, reason=OFPR_NO_MATCH)72    self.assertEqual(len(c.received), 1)73    self.assertTrue(isinstance(c.last, ofp_packet_in) and c.last.xid == 314,74          "should have received packet_in but got %s" % c.last)75    self.assertEqual(c.last.in_port,1)76    self.assertEqual(c.last.buffer_id,123)77    self.assertEqual(c.last.data, self.packet.pack())78  def test_process_packet(self):79    c = self.conn80    s = self.switch81    received = []82    s.addListener(DpPacketOut, lambda(event): received.append(event))83    # no flow entries -> should result in a packet_in84    s.process_packet(self.packet, in_port=1)85    self.assertEqual(len(c.received), 1)86    self.assertTrue(isinstance(c.last, ofp_packet_in),87          "should have received packet_in but got %s" % c.last)88    self.assertTrue(c.last.buffer_id > 0)89    # let's send a flow_mod with a buffer id90    c.to_switch(ofp_flow_mod(xid=124, buffer_id=c.last.buffer_id, priority=1,91                             match=ofp_match(in_port=1, nw_src="1.2.3.4"),92                             actions = [ ofp_action_output(port=3) ]93                             ))94    # that should have send the packet out port 395    self.assertEqual(len(received), 1)96    event = received[0]97    self.assertEqual(event.port.port_no,3)98    self.assertEqual(event.packet, self.packet)99    # now the next packet should go through on the fast path100    c.received = []101    received = []102    s.process_packet(self.packet, in_port=1)103    self.assertEqual(len(c.received), 0)104    self.assertEqual(len(received), 1)105    event = received[0]106    self.assertEqual(event.port.port_no,3)107    self.assertEqual(event.packet, self.packet)108    109  def test_take_port_down(self):110    c = self.conn111    s = self.switch112    original_num_ports = len(self.switch.ports)113    p = self.switch.ports.values()[0]114    s.take_port_down(p)115    new_num_ports = len(self.switch.ports)116    self.assertTrue(new_num_ports == original_num_ports - 1, "Should have removed the port")117    self.assertEqual(len(c.received), 1)118    self.assertTrue(isinstance(c.last, ofp_port_status),119          "should have received port_status but got %s" % c.last)120    self.assertTrue(c.last.reason == OFPPR_DELETE)121  122  def test_bring_port_up(self):123    c = self.conn124    s = self.switch125    original_num_ports = len(self.switch.ports)126    p = ofp_phy_port(port_no=1234)127    s.bring_port_up(p)128    new_num_ports = len(self.switch.ports)129    self.assertTrue(new_num_ports == original_num_ports + 1, "Should have added the port")130    self.assertEqual(len(c.received), 1)131    self.assertTrue(isinstance(c.last, ofp_port_status),132          "should have received port_status but got %s" % c.last)133    self.assertTrue(c.last.reason == OFPPR_ADD)134if __name__ == '__main__':...update.py
Source:update.py  
1from sqlalchemy.orm.exc import (NoResultFound, MultipleResultsFound)2from app.modules.quiz.models import Quiz3import itertools4class Singleton(type):5    def __init__(cls,name, bases, dict):6        super(Singleton, cls).__init__(name, bases, dict)7        cls.instance = None8    def __call__(cls, *args, **kwargs):9        if cls.instance is None:10            cls.instance = super(Singleton, cls).__call__(*args, **kwargs)11        else:12            cls._instances[cls].__init__(*args, **kwargs)13        return cls.instance14class Update(object):15    __metaclass__ = Singleton16    def __init__(self, session):17        self.__session = session18    def update_quiz(self, quiz_received, quiz_code):19        # type: (object) -> object20        try:21            quiz_found = self.__session.query(Quiz).filter_by(code=quiz_code).one()22            quiz_found.name = quiz_received.name23            quiz_found.description = quiz_received.description24            quiz_found.max_score = quiz_received.max_score25            new_max_score_per_question = quiz_found.max_score / quiz_found.question_count26            questions_received = quiz_received.questions27            questions_found = quiz_found.questions28            questions_iterator = [questions_received, questions_found]29            for question_received, question_found in itertools.product(*questions_iterator):30                if question_received.code == question_found.code:31                    question_found.text = question_received.text32                    question_found.max_score = new_max_score_per_question33                    question_found.score = 0.034                    options_received = question_received.options35                    options_found = question_found.options36                    options_iterator = [options_received, options_found]37                    for option_received, option_found in itertools.product(*options_iterator):38                        if option_received.code == option_found.code:39                            option_found.text = option_received.text40                            option_found.is_correct = option_received.is_correct41                            option_found.is_selected = option_received.is_selected42            return quiz_found43        except NoResultFound:44            print "Quiz of code {} not found.".format(quiz_code)45            return "-"46    def answer_quiz(self, quiz_received, quiz_code):47        try:48            quiz_found = self.__session.query(Quiz).filter_by(code=quiz_code).one()49            questions_received = quiz_received.questions50            questions_found = quiz_found.questions51            questions_iterator = [questions_received, questions_found]52            for question_received, question_found in itertools.product(*questions_iterator):53                if question_received.code == question_found.code:54                    options_received = question_received.options55                    options_found = question_found.options56                    options_iterator = [options_received, options_found]57                    for option_received, option_found in itertools.product(*options_iterator):58                        if option_received.code == option_found.code:59                            option_found.is_selected = option_received.is_selected60            return quiz_found61        except NoResultFound:62            print "Quiz of code {} not found.".format(quiz_code)63            return "-"64    def grade_quiz(self, quiz_code):65        try:66            quiz = self.__session.query(Quiz).filter_by(code=quiz_code).one()67            total_score = 0.068            questions = quiz.questions69            for question in questions:70                options = question.options71                for option in options:72                    if option.is_selected and option.is_correct:73                        question.score = question.max_score74                total_score = total_score + question.score75            quiz.score = total_score76            return quiz77        except NoResultFound:78            print "Quiz of code {} not found.".format(quiz_code)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
