Best Python code snippet using localstack_python
db_connection.py
Source:db_connection.py  
1import psycopg22from src.backend.models.dao.db_setup import DBAccess3from src.backend.models.dao.helpers.packager import Packager4class BackEnd:5    @classmethod6    def create_element(cls, model, user_id=None, prod_id=None):7        """8           Creates a new element within the corresponding Entity Model table in the database.9        :param prod_id:10        :param user_id:11        :param model: Entity Model12        :return: A copy of the information added to the database (same Entity Model as the model given)13        """14        if model.__class__.__name__ == 'UserModel':15            return cls.__db_fetch_one(16                """17                INSERT INTO usr (first_name, last_name, created_on, valid, password, phone, admin) 18                VALUES ('{}', '{}', current_timestamp, false, '{}', '{}', false) 19                RETURNING *20                """.format(21                    model.get_first_name(),22                    model.get_last_name(),23                    model.get_password(),24                    model.get_phone_num()),25                'UserModel'26            )27        elif model.__class__.__name__ == 'ProductModel':28            return cls.__db_fetch_one(29                """30                INSERT INTO products (name, description, price, category, stock, visible) \n31                VALUES ('{}', '{}', {}, '{}', {}, true)32                RETURNING *33                """.format(34                    model.get_name(),35                    model.get_desc(),36                    model.get_price(),37                    model.get_category(),38                    model.get_stock()39                ),40                'ProductModel'41            )42        elif model.__class__.__name__ == 'OrderModel':43            if user_id:44                # Since orders are divided between tables the process is divided in parts45                db_connection = DBAccess().connect_to_db()46                cursor = db_connection.cursor()47                try:48                    # Create the order49                    cursor.execute(50                        """51                        INSERT INTO orders (user_id, time_of_order)52                        VALUES ({}, current_timestamp)53                        RETURNING order_id, time_of_order54                        """.format(user_id)55                    )56                    orders_response = cursor.fetchone()57                    model.set_order_id(orders_response[0])58                    model.set_time_of_order(str(orders_response[1]))59                    # Add the products to the order60                    for item in model.get_product_list():61                        cursor.execute(62                            """63                            INSERT INTO order_products (order_id_fk, price_sold, quantity_bought, product_id_fk) 64                            VALUES ({}, {}, {}, {})65                            """.format(model.get_order_id(),66                                       item.price_sold,67                                       item.quantity_bought,68                                       item.product_id69                                       )70                        )71                    cursor.execute(72                        """73                        SELECT SUM(price_sold * quantity_bought) AS total, SUM(quantity_bought) AS total_order_quantity74                        FROM orders INNER JOIN order_products op ON orders.order_id = op.order_id_fk75                        WHERE order_id = {}76                        """.format(model.get_order_id())77                    )78                    products_response = cursor.fetchone()79                    model.set_order_total(products_response[0])80                    model.set_total_product_quantity(products_response[1])81                except psycopg2.Error as e:82                    print(e)83                    pass84                db_connection.commit()85                db_connection.close()86                return model87        elif model.__class__.__name__ == 'LikedListModel':88            # TODO: implement logic89            cls.__db_run_command(90                """91                INSERT INTO likedlist (user_id, product_id, time_like)92                VALUES ('{}','{}', current_timestamp)93                """.format(94                    user_id,95                    prod_id96                )97            )98        elif model.__class__.__name__ == 'CartModel':99            return cls.__db_run_command(100                """101                INSERT INTO cart (product_id, user_id, product_quantity, product_price)102                VALUES ({}, {}, {}, {})103                """.format(104                    model.get_product_id(),105                    model.get_user_id(),106                    model.get_product_quantity(),107                    model.get_product_price()108                )109            )110    @classmethod111    def get_element(cls, model, pk, select_attributes: str, helper=None):112        """113            Queries the corresponding entity table in the database for the given primary key.114        :param model: class instance of the desired Entity Model115        :param pk: primary key corresponding to the Entity Model116        :param select_attributes: attributes to retrieve from the database117        :param helper: in the case of the liked list, the prod_id that is being liked/disliked118        #in the case of product, its to use it as a filter clause119        :return: Desired Entity Model with the information corresponding to the primary key queried120        """121        if model.__class__.__name__ == 'UserModel':122            # If the where_clause_statement is not empty123            # TODO: What happens if there are no results?124            return cls.__db_fetch_one(125                """126                    SELECT {}127                    FROM usr128                    WHERE user_id = {}129                    """.format(select_attributes, pk),130                'UserModel'131            )132        elif model.__class__.__name__ == 'ProductModel':133            if helper:134                return cls.__db_fetch_one(135                    """136                        SELECT {}137                        FROM products138                        WHERE {}={}139                        """.format(select_attributes, helper, pk),140                    'ProductModel'141                )142            else:143                return cls.__db_fetch_one(144                    """145                        SELECT {}146                        FROM products147                        WHERE product_id = {}148                        """.format(select_attributes, pk),149                    'ProductModel'150                )151        elif model.__class__.__name__ == 'OrderModel':152            from src.backend.models.order import OrderModel153            from src.backend.models.order import OrderProduct154            order = OrderModel()155            db_connection = DBAccess().connect_to_db()156            cursor = db_connection.cursor()157            try:158                # Get the main order details159                cursor.execute(160                    """161                    SELECT user_id, time_of_order162                    FROM orders163                    WHERE order_id = {}164                    """.format(pk)165                )166                db_response = cursor.fetchone()167                if db_response:168                    order.set_order_id(pk)169                    order.set_user_id(db_response[0])170                    order.set_time_of_order(str(db_response[1]))171                    # Create a full OrderModel with all the products corresponding to it within172                    if select_attributes == 'full':173                        # This could probably be applied/moved to the packager, however time is not on our side D;174                        # Add the products to the order queried175                        cursor.execute(176                            """177                            SELECT name, description, price_sold, quantity_bought, category, product_id178                            FROM order_products NATURAL INNER JOIN products179                            WHERE order_id_fk = {} AND product_id = product_id_fk180                            """.format(pk)181                        )182                        db_response = cursor.fetchall()183                        # order.set_product_list([])184                        # Inhabit the order's product list185                        for item in db_response:186                            product = OrderProduct()187                            product.tuple_to_model(item)188                            product.order_id = order.get_order_id()189                            order.add_product_to_model(product)190                        cursor.execute(191                            """192                            SELECT SUM(price_sold * quantity_bought) AS total, SUM(quantity_bought) AS total_order_quantity193                            FROM orders INNER JOIN order_products op ON orders.order_id = op.order_id_fk194                            WHERE order_id = {}195                            """.format(pk)196                        )197                        db_response = cursor.fetchone()198                        order.set_order_total(db_response[0])199                        order.set_total_product_quantity(db_response[1])200                        return order201                    # Return only the order, its user and the time it was created202                    elif select_attributes == 'order':203                        return order204                    else:205                        raise AttributeError("The requested order details are not supported.")206                else:207                    return None208            except psycopg2.Error as e:209                print(e)210                db_connection.commit()211                db_connection.close()212                return None213        elif model.__class__.__name__ == 'LikedListModel':214            if helper is not None:215                return cls.__db_fetch_one(216                    """217                    SELECT {}218                    FROM likedlist219                    WHERE product_id={} AND user_id={}220                    """.format(select_attributes, pk, helper),221                    'LikedListModel'222                )223            else:224                return cls.__db_fetch_one(225                    """226                    SELECT {}227                    FROM likedlist228                    WHERE product_id={}229                    """.format(select_attributes, pk),230                    'LikedListModel'231                )232        elif model.__class__.__name__ == 'CartModel':233            # TODO: implement logic234            return "goomba"235    @classmethod236    def delete_element(cls, model, pk: int, prod_id=None):237        """238            Completely removes a row from the corresponding table.239        :param prod_id: used for queries that have to do with LikedList and Cart240        :param model: class instance of the desired Entity Model241        :param pk: primary key corresponding to the Entity Model242        :return: boolean if the deletion was successfully completed243        """244        # TODO: Make the function return the boolean on completion245        if model.__class__.__name__ == 'UserModel':246            try:247                cls.__db_run_command(248                    """249                    DELETE FROM usr250                    WHERE user_id = {}251                    """.format(pk)252                )253                return True254            except psycopg2.Error as e:255                print(e)256                return False257        elif model.__class__.__name__ == 'ProductModel':258            try:259                cls.__db_run_command(260                    """261                    UPDATE products262                    SET visible = false263                    WHERE product_id = {}264                    """.format(pk)265                )266                return True267            except psycopg2.Error as e:268                print(e)269                return False270        elif model.__class__.__name__ == 'OrderModel':271            try:272                cls.__db_run_command(273                    """274                    DELETE FROM orders275                    WHERE order_id = {}276                    """.format(pk)277                )278                return True279            except psycopg2.Error as e:280                print(e)281                return False282        elif model.__class__.__name__ == 'LikedListModel':283            if prod_id is not None:284                try:285                    cls.__db_run_command(286                        """287                        DELETE FROM likedlist288                        WHERE user_id = {} and product_id={}289                        """.format(pk, prod_id)290                    )291                    return True292                except psycopg2.Error as e:293                    print(e)294                    return False295            else:296                try:297                    cls.__db_run_command(298                        """299                        DELETE FROM likedlist300                        WHERE user_id = {}301                        """.format(pk)302                    )303                    return True304                except psycopg2.Error as e:305                    print(e)306                    return False307        elif model.__class__.__name__ == 'CartModel':308            if prod_id is not None:309                try:310                    cls.__db_run_command(311                        """312                        DELETE FROM cart313                        WHERE user_id = {} AND product_id = {}314                        """.format(pk, prod_id)315                    )316                    return True317                except psycopg2.Error as e:318                    print(e)319                    return False320            else:321                try:322                    cls.__db_run_command(323                        """324                        DELETE FROM cart325                        WHERE user_id = {}326                        """.format(pk)327                    )328                    return True329                except psycopg2.Error as e:330                    print(e)331                    return False332    @classmethod333    def get_all_elements(cls, model, select_attributes: str, filter_clause: str):334        """335            Queries the database for all the elements of the given corresponding Entity.336        :param model: class instance of the desired Entity Model337        :param select_attributes: attributes desired from the query338        :param filter_clause: specific filters desired for the query339        :return: list containing the desired Entity Models of matching query results340        """341        if model.__class__.__name__ == 'ProductModel':342            # If the where_clause_statement is not empty343            if filter_clause:344                return cls.__db_fetch_all(345                    """346                    SELECT {}347                    FROM products348                    WHERE {} AND visible = true349                    """.format(select_attributes, filter_clause),350                    'ProductModel'351                )352            else:353                return cls.__db_fetch_all(354                    """355                    SELECT {}356                    FROM products357                    WHERE visible = true358                    """.format(select_attributes),359                    'ProductModel'360                )361        elif model.__class__.__name__ == 'OrderModel':362            from src.backend.models.order import OrderModel363            from src.backend.models.order import OrderProduct364            db_connection = DBAccess().connect_to_db()365            cursor = db_connection.cursor()366            try:367                # The user is not an admin, get their orders368                if filter_clause:369                    # Get orders from DB370                    cursor.execute(371                        """372                        SELECT {}373                        FROM orders374                        WHERE {}375                        """.format(select_attributes, filter_clause)376                    )377                # The user is an admin, get all the existing orders378                else:379                    # Get orders from DB380                    cursor.execute(381                        """382                        SELECT {}383                        FROM orders384                        """.format(select_attributes)385                    )386                orders_made = []387                db_response = cursor.fetchall()388                # Inhabit list of user's orders389                for transaction in db_response:390                    order = OrderModel()391                    order.set_order_id(transaction[0])392                    order.set_user_id(transaction[1])393                    order.set_time_of_order(transaction[2])394                    # Get the products related to the order395                    cursor.execute(396                        """397                        SELECT name, description, price_sold, quantity_bought, category, product_id398                        FROM order_products NATURAL INNER JOIN products399                        WHERE order_id_fk = {} AND product_id = product_id_fk400                        """.format(order.get_order_id())401                    )402                    db_response = cursor.fetchall()403                    # order.set_product_list([])404                    for product in db_response:405                        item = OrderProduct()406                        item.tuple_to_model(product)407                        item.order_id = order.get_order_id()408                        order.add_product_to_model(item)409                    cursor.execute(410                        """411                        SELECT SUM(price_sold * quantity_bought) AS total, SUM(quantity_bought) AS total_order_quantity412                        FROM orders INNER JOIN order_products op ON orders.order_id = op.order_id_fk413                        WHERE order_id = {}414                        """.format(order.get_order_id())415                    )416                    db_response = cursor.fetchone()417                    order.set_order_total(db_response[0])418                    order.set_total_product_quantity(db_response[1])419                    orders_made.append(order)420                return orders_made421            except Exception as e:422                print(e)423        elif model.__class__.__name__ == 'LikedListModel':424            return cls.__db_fetch_all(425                """426                SELECT {}427                FROM likedlist428                WHERE {}429                """.format(select_attributes, filter_clause),430                'LikedListModel'431            )432        elif model.__class__.__name__ == 'CartModel':433            return cls.__db_fetch_all(434                """435                SELECT {}436                FROM cart437                WHERE {}438                """.format(select_attributes, filter_clause),439                'CartModel'440            )441        elif model.__class__.__name__ == 'UserModel':442            # If the where_clause_statement is not empty443            if filter_clause:444                return cls.__db_fetch_all(445                    """446                    SELECT {}447                    FROM usr448                    WHERE {}449                    """.format(select_attributes, filter_clause),450                    'UserModel'451                )452            else:453                return cls.__db_fetch_all(454                    """455                    SELECT {}456                    FROM usr457                    """.format(select_attributes),458                    'UserModel'459                )460    @classmethod461    def get_all_elements_ordered(462            cls, model, select_attributes: str, filter_clause: str, order_attribute: str, sort: str, limit: int = 500):463        """464            Queries the database for all the elements of the given corresponding Entity in a specified order.465        :param model: class instance of the desired Entity Model466        :param select_attributes: attributes desired from the query467        :param filter_clause: specific filters desired for the query468        :param order_attribute: specific attribute to order469        :param sort: ascending (ASC) or descending (DESC)470        :param limit: limit the amount of results471        :return: list containing the desired Entity Models of matching query results in the specified order472        """473        if model.__class__.__name__ == 'ProductModel':474            # If the where_clause_statement is not empty (ie no filter required)475            if limit == 500:476                if filter_clause:477                    return cls.__db_fetch_all(478                        """479                        SELECT {}480                        FROM products481                        WHERE {} AND visible = true482                        ORDER BY {} {}483                        """.format(select_attributes, filter_clause, order_attribute, sort),484                        'ProductModel'485                    )486                else:487                    return cls.__db_fetch_all(488                        """489                        SELECT {}490                        FROM products491                        WHERE visible = true492                        ORDER BY {} {}493                        """.format(select_attributes, order_attribute, sort),494                        'ProductModel'495                    )496            else:497                if filter_clause:498                    return cls.__db_fetch_all(499                        """500                        SELECT {}501                        FROM products502                        WHERE {}503                        ORDER BY {} {}504                        LIMIT {}505                        """.format(select_attributes, filter_clause, order_attribute, sort, limit),506                        'ProductModel'507                    )508                else:509                    return cls.__db_fetch_all(510                        """511                        SELECT {}512                        FROM products513                        ORDER BY {} {}514                        LIMIT {}515                        """.format(select_attributes, order_attribute, sort, limit),516                        'ProductModel'517                    )518        elif model.__class__.__name__ == 'OrderModel':519            # TODO: implement logic520            return "goomba"521        elif model.__class__.__name__ == 'LikedListModel':522            # TODO: implement logic523            return "goomba"524        elif model.__class__.__name__ == 'CartModel':525            return "goomba"526    @classmethod527    def get_all_unique_values(cls, table: str, attribute: str):528        """529            Queries the database and returns a list of all the unique values of a given attribute found in the given530            table.531        :param table: table that contains the attribute532        :param attribute: attribute to get unique values from533        """534        # Prevents password leakage?535        if attribute != 'password':536            if table.lower() == 'products':537                return cls.__db_fetch_all(538                    """539                    SELECT DISTINCT {}540                    FROM {}541                    WHERE visible = true542                    """.format(attribute, table), 'List')543            else:544                return cls.__db_fetch_all(545                    """546                    SELECT DISTINCT {}547                    FROM {}548                    """.format(attribute, table), 'List')549    @classmethod550    def update_element_attribute(cls, table: str, change: str, filter_clause: str):551        """552            Updates a specific attribute in a given table.553        :param table: table where554        :param change: specific change to attribute555        :param filter_clause: specific filters for desired change556        """557        # Registers a user558        if table.lower() == 'usr':559            cls.__db_run_command(560                """561                UPDATE {}562                SET {}563                WHERE {}564                """.format(table, change, filter_clause)565            )566        elif table.lower() == 'products':567            cls.__db_run_command(568                """569                UPDATE {}570                SET {}571                WHERE {}572                """.format(table, change, filter_clause)573            )574        elif table.lower() == 'orders':575            cls.__db_run_command(576                """577                UPDATE {}578                SET {}579                WHERE {}580                """.format(table, change, filter_clause)581            )582        elif table.lower() == 'likedlist':583            cls.__db_run_command(584                """585                UPDATE {}586                SET {}587                WHERE {}588                """.format(table, change, filter_clause)589            )590        elif table.lower() == 'cart':591            cls.__db_run_command(592                """593                UPDATE {}594                SET {}595                WHERE {}596                """.format(table, change, filter_clause)597            )598    @classmethod599    def __db_fetch_one(cls, command: str, return_type: str):600        """601            Backend private function. Runs a query that returns a single result.602        :param command: command to be run in database603        :param return_type: type of entity model desired (String)604        :return: desired Entity Model of matching query result605        """606        db_connection = DBAccess().connect_to_db()607        cursor = db_connection.cursor()608        try:609            cursor.execute(command)610        except psycopg2.Error as e:611            print(e)612            pass613        response = cursor.fetchone()614        db_connection.commit()615        db_connection.close()616        response_object = Packager.package_response(response, return_type)617        return response_object618    @classmethod619    def __db_fetch_all(cls, command: str, return_type: str, categories=False):620        """621            Backend private function. Runs a query that returns multiple results.622        :param command: command to be run in database623        :param return_type: type of entity model desired (String)624        :return: list containing the desired Entity Models of matching query results625        """626        db_connection = DBAccess().connect_to_db()627        cursor = db_connection.cursor()628        try:629            cursor.execute(command)630        except psycopg2.Error as e:631            print(e)632            pass633        response = cursor.fetchall()634        db_connection.commit()635        db_connection.close()636        return Packager().package_response(response, return_type, categories)637    @classmethod638    def __db_run_command(cls, command: str):639        """640            Backend private function. Runs a custom query in the database.641        :param command: command to be run in database642        """643        db_connection = DBAccess().connect_to_db()644        cursor = db_connection.cursor()645        try:646            cursor.execute(command)647        except psycopg2.Error as e:648            print(e)649            pass650        db_connection.commit()651        db_connection.close()652    @classmethod653    def get_elements_beta(654            cls, model, select_attributes: str, filter_clause: str, order_attribute: str, group_attribute: str,655            sort: str, limit: int = None, categories: bool = False):656        """657            Queries the database for all the elements of the given corresponding Entity in a specified order.658        :param group_attribute:659        :param limit:660        :param categories:661        :type order_attribute: object662        :param model: class instance of the desired Entity Model663        :param select_attributes: attributes desired from the query664        :param filter_clause: specific filters desired for the query665        :param order_attribute: specific attribute to order666        :param sort: ascending (ASC) or descending (DESC)667        :return: list containing the desired Entity Models of matching query results in the specified order668        """669        # The difference betweeen get_elements and this one is that this one can hold more paramenters670        # than its sister method. Can be refactored later671        if model.__class__.__name__ == 'OrderProduct':672            if filter_clause == '':673                return cls.__db_fetch_all(674                    """675                    SELECT {}676                    FROM orders NATURAL INNER JOIN order_products NATURAL INNER JOIN products677                    WHERE product_id = product_id_fk AND order_id = order_id_fk678                    GROUP BY {}679                    ORDER BY {} {}680                    LIMIT {}681                    """.format(select_attributes, group_attribute, order_attribute, sort, limit),682                    'OrderProduct',683                    categories684                )685            else:686                return cls.__db_fetch_all(687                    """688                    SELECT {}689                    FROM orders NATURAL INNER JOIN order_products NATURAL INNER JOIN products690                    WHERE {} AND product_id = product_id_fk691                    GROUP BY {}692                    ORDER BY {} {}693                    LIMIT {}694                    """.format(select_attributes, filter_clause, group_attribute, order_attribute, sort, limit),695                    'OrderProduct',696                    categories697                )698        elif model.__class__.__name__ == 'LikedListModel':699            # If the where_clause_statement is not empty (ie no filter required)700            return cls.__db_fetch_all(701                """702                    SELECT {}703                    FROM likedlist704                    GROUP BY {}705                    ORDER BY {} {}706                    LIMIT {}707                    """.format(select_attributes, group_attribute, order_attribute, sort, limit),708                'LikedListModel'709            )710    @classmethod711    def get_elements_join(712            cls, model, select_attributes: str, on: str, filter_clause: str, order_attribute: str,713            sort: str, limit: int = None, categories: bool = False, group_attribute: str = None):714        """715            Queries the database for all the elements of the given corresponding Entity in a specified order.716        :param group_attribute:717        :param on:718        :param categories:719        :param limit:720        :type order_attribute: object721        :param model: class instance of the desired Entity Model722        :param select_attributes: attributes desired from the query723        :param filter_clause: specific filters desired for the query724        :param order_attribute: specific attribute to order725        :param sort: ascending (ASC) or descending (DESC)726        :return: list containing the desired Entity Models of matching query results in the specified order727        """728        # The difference betweeen get_elements and this one is that this one can hold more paramenters729        # than its sister method. Can be refactored later730        if model.__class__.__name__ == 'OrderProduct':731            if on == '':732                return cls.__db_fetch_all(733                    """734                    SELECT {}735                    FROM order_products NATURAL INNER JOIN products736                    GROUP BY {}737                    ORDER BY {} {}738                    WHERE product_id = product_id_fk739                    LIMIT {}740                    """.format(select_attributes, group_attribute, order_attribute, sort, limit),741                    'OrderProduct',742                    categories743                )744            elif group_attribute:745                return cls.__db_fetch_all(746                    """747                    SELECT {}748                    FROM order_products NATURAL INNER JOIN orders NATURAL INNER JOIN products749                    WHERE {} product_id = product_id_fk AND order_id = order_id_fk750                    GROUP BY {}751                    ORDER BY {} {}752                    LIMIT {}753                    """.format(select_attributes, filter_clause, group_attribute, order_attribute, sort, limit),754                    'OrderProduct',755                    categories756                )757            else:758                return cls.__db_fetch_all(759                    """760                    SELECT {}761                    FROM order_products762                    NATURAL INNER JOIN orders NATURAL INNER JOIN products763                    WHERE {} product_id = product_id_fk764                    ORDER BY {} {}765                    LIMIT {}766                    """.format(select_attributes, filter_clause, order_attribute, sort, limit),767                    'OrderProduct',768                    categories769                )770        elif model.__class__.__name__ == 'LikedListModel':771            # If the where_clause_statement is not empty (ie no filter required)772            return cls.__db_fetch_all(773                """774                    SELECT {}775                    FROM likedlist776                    GROUP BY {}777                    ORDER BY {} {}778                    LIMIT {}779                    """.format(select_attributes, group_attribute, order_attribute, sort, limit),780                'LikedListModel'...test_elements.py
Source:test_elements.py  
...191            )192            == "3"193        )194@pytest.fixture195def select_attributes(make_confirm):196    return {197        "action_id": "action_id",198        "placeholder": "placeholder",199        "confirm": make_confirm(),200    }201class TestSelect:202    def test(self, snapshot, select_attributes):203        snapshot.assert_match(MockSelect(**select_attributes))204    def test_validate_placeholder(self, select_attributes):205        with pytest.raises(ValidationError):206            select_attributes["placeholder"] = "x" * 151207            MockSelect(**select_attributes)208    def test_required_placeholder(self, select_attributes):209        with pytest.raises(ValidationError):210            select_attributes["placeholder"] = None211            MockSelect(**select_attributes)212class TestStaticSelect:213    @pytest.fixture214    def attributes(self, make_confirm, select_attributes):215        return {216            **select_attributes,217            "initial_option": Option(text="text", value="value"),218        }219    def test_options(self, snapshot, attributes):220        attributes["options"] = [Option(text="text", value="value")]221        snapshot.assert_match(StaticSelect(**attributes))222    def test_option_groups(self, snapshot, attributes):223        attributes["option_groups"] = [224            OptionGroup(label="label", options=[Option(text="text", value="value")])225        ]226        snapshot.assert_match(StaticSelect(**attributes))227    def test_validate_options_or_options_group(self, attributes):228        attributes["options"] = [Option(text="text", value="value")]229        attributes["option_groups"] = [230            OptionGroup(label="label", options=[Option(text="text", value="value")])231        ]232        with pytest.raises(ValidationError):233            StaticSelect(**attributes)234    @pytest.mark.parametrize(235        "field, length", [("options", 100), ("option_groups", 100),],236    )237    def test_validate_length(self, attributes, field, length):238        attributes[field] = "x" * (length + 1)239        with pytest.raises(ValidationError):240            StaticSelect(**attributes)241    def test_parse_value(self):242        value = "option_1"243        assert StaticSelect.parse_value({"selected_option": {"value": value}}) == value244    def test_parse_value__nothing_selected(self):245        assert StaticSelect.parse_value({}) is None246class TestConversationSelect:247    def test(self, snapshot, select_attributes):248        snapshot.assert_match(249            ConversationSelect(initial_conversation="1", **select_attributes)250        )251    def test_parse_value(self):252        assert (253            ConversationSelect.parse_value({"selected_conversation": "value"})254            == "value"255        )256class TestChannelSelect:257    def test(self, snapshot, select_attributes):258        snapshot.assert_match(ChannelSelect(initial_channel="1", **select_attributes))259    def test_parse_value(self):260        assert ChannelSelect.parse_value({"selected_channel": "value"}) == "value"261class TestUserSelect:262    def test(self, snapshot, select_attributes):263        snapshot.assert_match(UserSelect(initial_user="1", **select_attributes))264    def test_parse_value(self):265        assert UserSelect.parse_value({"selected_user": "value"}) == "value"266@pytest.fixture267def multiselect_attributes(select_attributes):268    return {"max_selected_items": 2, **select_attributes}269class TestMultiSelect:270    def test(self, snapshot, multiselect_attributes):271        snapshot.assert_match(MockMultiSelect(**multiselect_attributes))272    def test_validate_max_selected_items(self, multiselect_attributes):273        with pytest.raises(ValidationError):274            multiselect_attributes["max_selected_items"] = 0275            MockMultiSelect(**multiselect_attributes)276class TestStaticMultiSelect:277    @pytest.fixture278    def attributes(self, make_confirm, multiselect_attributes):279        return {280            **multiselect_attributes,281            "initial_options": [Option(text="text", value="value")],...mozilla_parser.py
Source:mozilla_parser.py  
1from moz_sql_parser import parse2import json3sql_statement = "select ename,location from EMP"4def extract_join_attributes(where_list,relation_names):5    join_attributes=list()6    select_attributes=list()7    # traversing all predicates present after where8    for dict in where_list:9        left_side_join = False10        right_side_join = False11        for i in dict:12            if i == "or":13                select_attributes.append(dict)14            else:15                temp_list=dict[i]16                if "." in temp_list[0]:17                    split_list=temp_list[0].split(".")18                    if split_list[0] in relation_names:19                        left_side_join=True20                if "." in temp_list[1]:21                    split_list=temp_list[1].split(".")22                    if split_list[0] in relation_names:23                        right_side_join=True24                if left_side_join and right_side_join:25                    join_attributes.append(dict)26                else:27                    select_attributes.append(dict)28    return join_attributes, select_attributes29json_dump = json.dumps(parse(sql_statement))30json_dictionary=json.loads(json_dump)31print("dictionary" ,':', json_dictionary)32leaf_attributes=json_dictionary["from"]33project_attributes=list()34for d in json_dictionary["select"]:35        project_attributes.append(d['value'])36print(project_attributes)37if "where" in json_dictionary:38        # separate join predicates and select predicate39        where_list=json_dictionary["where"]["and"]40        join_attributes, select_attributes=extract_join_attributes(where_list,leaf_attributes)41        for attributes in join_attributes:42            for i in attributes:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
