How to use _create_initial_data method in autotest

Best Python code snippet using autotest_python

test_smartquery.py

Source:test_smartquery.py Github

copy

Full Screen

...90async def session():91 async with in_tx(async_session) as db:92 yield db93class BaseTest:94 async def _create_initial_data(self, db):95 u1 = User(name="Bill u1")96 db.add(u1)97 await db.flush()98 u2 = User(name="Alex u2")99 db.add(u2)100 await db.flush()101 u3 = User(name="Bishop u3")102 db.add(u3)103 await db.flush()104 p11 = Post(id=11, body="1234567890123", archived=True, user=u1)105 db.add(p11)106 await db.flush()107 p12 = Post(id=12, body="1234567890", user=u1)108 db.add(p12)109 await db.flush()110 p21 = Post(id=21, body="p21 by u2", user=u2)111 db.add(p21)112 await db.flush()113 p22 = Post(id=22, body="p22 by u2", user=u2)114 db.add(p22)115 await db.flush()116 cm11 = Comment(117 id=11,118 body="cm11 to p11",119 user=u1,120 post=p11,121 rating=1,122 created_at=datetime(2014, 1, 1),123 )124 db.add(cm11)125 await db.flush()126 cm12 = Comment(127 id=12,128 body="cm12 to p12",129 user=u2,130 post=p12,131 rating=2,132 created_at=datetime(2015, 10, 20),133 )134 db.add(cm12)135 await db.flush()136 cm21 = Comment(137 id=21,138 body="cm21 to p21",139 user=u1,140 post=p21,141 rating=1,142 created_at=datetime(2015, 11, 21),143 )144 db.add(cm21)145 await db.flush()146 cm22 = Comment(147 id=22,148 body="cm22 to p22",149 user=u3,150 post=p22,151 rating=3,152 created_at=datetime(2016, 11, 20),153 )154 db.add(cm22)155 await db.flush()156 cm_empty = Comment(157 id=29,158 # no body159 # no user160 # no post161 # no rating162 )163 db.add(cm_empty)164 await db.flush()165 return u1, u2, u3, p11, p12, p21, p22, cm11, cm12, cm21, cm22, cm_empty166@pytest.mark.incremental167@pytest.mark.asyncio168@pytest.mark.usefixtures("setup_db", "session")169class TestFilterExpr(BaseTest):170 def test_filterable_attributes(self):171 assert set(User.filterable_attributes) == {172 # normal columns173 "id",174 "name",175 # relations176 "posts",177 "comments",178 "comments_",179 }180 assert "posts_viewonly" not in set(User.filterable_attributes)181 assert set(Post.filterable_attributes) == {182 # normal columns183 "id",184 "body",185 "user_id",186 "archived",187 # relations188 "user",189 "comments",190 # hybrid attributes191 "public",192 # hybrid methods193 "is_public",194 "is_commented_by_user",195 }196 assert set(Comment.filterable_attributes) == {197 # normal columns198 "id",199 "body",200 "post_id",201 "user_id",202 "rating",203 "created_at",204 # hybrid attributes205 "user",206 "post",207 }208 async def test_incorrect_expr(self, session):209 with pytest.raises(KeyError):210 stmt = sa.select(Post).filter(*Post.filter_expr(INCORRECT_ATTR="nomatter"))211 _ = (await session.execute(stmt)).scalars().all()212 async def test_columns(self, session):213 (214 u1,215 u2,216 u3,217 p11,218 p12,219 p21,220 p22,221 cm11,222 cm12,223 cm21,224 cm22,225 cm_empty,226 ) = await self._create_initial_data(session)227 # users having posts which are commented by user 2228 stmt = sa.select(Post).filter(*Post.filter_expr(user=u1))229 posts = (await session.execute(stmt)).scalars().all()230 assert set(posts) == {p11, p12}231 stmt = sa.select(Post).filter(*Post.filter_expr(user=u1, archived=False))232 posts = (await session.execute(stmt)).scalars().all()233 assert set(posts) == {p12}234 async def test_hybrid_properties(self, session):235 (236 u1,237 u2,238 u3,239 p11,240 p12,241 p21,242 p22,243 cm11,244 cm12,245 cm21,246 cm22,247 cm_empty,248 ) = await self._create_initial_data(session)249 stmt = sa.select(Post).filter(*Post.filter_expr(public=True))250 public_posts = (await session.execute(stmt)).scalars().all()251 stmt = sa.select(Post).filter(*Post.filter_expr(archived=False))252 non_archived_posts = (await session.execute(stmt)).scalars().all()253 assert public_posts == non_archived_posts254 stmt = sa.select(Post).filter(*Post.filter_expr(public=True))255 public_posts = (await session.execute(stmt)).scalars().all()256 assert set(public_posts) == {p12, p21, p22}257 stmt = sa.select(Post).filter(*Post.filter_expr(archived=False))258 non_archived_posts = (await session.execute(stmt)).scalars().all()259 assert set(non_archived_posts) == {p12, p21, p22}260 async def test_hybrid_methods(self, session):261 (262 u1,263 u2,264 u3,265 p11,266 p12,267 p21,268 p22,269 cm11,270 cm12,271 cm21,272 cm22,273 cm_empty,274 ) = await self._create_initial_data(session)275 # posts which are commented by user 1276 stmt = sa.select(Post).filter(*Post.filter_expr(is_commented_by_user=u1))277 posts = (await session.execute(stmt)).scalars().all()278 assert set(posts) == {p11, p21}279 # posts which are commented by user 2280 stmt = sa.select(Post).filter(*Post.filter_expr(is_commented_by_user=u2))281 posts = (await session.execute(stmt)).scalars().all()282 assert set(posts) == {p12}283 async def test_combinations(self, session):284 (285 u1,286 u2,287 u3,288 p11,289 p12,290 p21,291 p22,292 cm11,293 cm12,294 cm21,295 cm22,296 cm_empty,297 ) = await self._create_initial_data(session)298 # non-public posts commented by user 1299 stmt = sa.select(Post).filter(300 *Post.filter_expr(public=False, is_commented_by_user=u1)301 )302 posts = (await session.execute(stmt)).scalars().all()303 assert set(posts) == {p11}304 async def test_operators(self, session):305 (306 u1,307 u2,308 u3,309 p11,310 p12,311 p21,312 p22,313 cm11,314 cm12,315 cm21,316 cm22,317 cm_empty,318 ) = await self._create_initial_data(session)319 async def test(filters, expected_result):320 stmt = sa.select(Comment).filter(*Comment.filter_expr(**filters))321 result = (await session.execute(stmt)).scalars().all()322 assert set(result) == expected_result323 # test incorrect attribute324 with pytest.raises(KeyError):325 await test(dict(rating__INCORRECT_OPERATOR="nomatter"), {"nomatter"})326 # not327 await test(dict(id__not=11), {cm12, cm21, cm22, cm_empty})328 # rating == None329 await test(dict(rating=None), {cm_empty})330 await test(dict(rating__isnull=2), {cm_empty})331 # rating == 2332 await test(dict(rating=2), {cm12}) # when no operator, 'exact' is assumed333 await test(dict(rating__exact=2), {cm12})334 # rating != 2335 await test(dict(rating__ne=2), {cm11, cm21, cm22})336 # rating > 2337 await test(dict(rating__gt=2), {cm22})338 # rating >= 2339 await test(dict(rating__ge=2), {cm12, cm22})340 # rating < 2341 await test(dict(rating__lt=2), {cm11, cm21})342 # rating <= 2343 await test(dict(rating__le=2), {cm11, cm12, cm21})344 # rating in [1,3]345 await test(dict(rating__in=[1, 3]), {cm11, cm21, cm22}) # list346 await test(dict(rating__in=(1, 3)), {cm11, cm21, cm22}) # tuple347 await test(dict(rating__in={1, 3}), {cm11, cm21, cm22}) # set348 # rating not in [1,3]349 await test(dict(rating__notin=[1, 3]), {cm12}) # list350 await test(dict(rating__notin=(1, 3)), {cm12}) # tuple351 await test(dict(rating__notin={1, 3}), {cm12}) # set352 # rating between 2 and 3353 await test(dict(rating__between=[2, 3]), {cm12, cm22}) # list354 await test(dict(rating__between=(2, 3)), {cm12, cm22}) # set355 # likes356 await test(dict(body__like="cm12 to p12"), {cm12})357 await test(dict(body__like="%cm12%"), {cm12})358 await test(dict(body__ilike="%CM12%"), {cm12})359 await test(dict(body__startswith="cm1"), {cm11, cm12})360 await test(dict(body__istartswith="CM1"), {cm11, cm12})361 await test(dict(body__endswith="to p12"), {cm12})362 await test(dict(body__iendswith="TO P12"), {cm12})363 # dates364 # year365 await test(dict(created_at__year=2014), {cm11})366 await test(dict(created_at__year=2015), {cm12, cm21})367 # month368 await test(dict(created_at__month=1), {cm11})369 await test(dict(created_at__month=11), {cm21, cm22})370 # day371 await test(dict(created_at__day=1), {cm11})372 await test(dict(created_at__day=20), {cm12, cm22})373 # whole date374 await test(375 dict(created_at__year=2014, created_at__month=1, created_at__day=1),376 expected_result={cm11},377 )378 await test(dict(created_at=datetime(2014, 1, 1)), {cm11})379 # date comparisons380 await test(dict(created_at__year_ge=2014), {cm11, cm12, cm21, cm22})381 await test(dict(created_at__year_gt=2014), {cm12, cm21, cm22})382 await test(dict(created_at__year_le=2015), {cm11, cm12, cm21})383 await test(dict(created_at__month_lt=10), {cm11})384@pytest.mark.incremental385@pytest.mark.asyncio386@pytest.mark.usefixtures("setup_db", "session")387class TestOrderExpr(BaseTest):388 async def test_incorrect_expr(self):389 with pytest.raises(KeyError):390 _ = sa.select(Post).filter(*Post.order_expr("INCORRECT_ATTR"))391 with pytest.raises(KeyError):392 _ = sa.select(Post).filter(*Post.order_expr("*body"))393 async def test_asc(self, session):394 (395 u1,396 u2,397 u3,398 p11,399 p12,400 p21,401 p22,402 cm11,403 cm12,404 cm21,405 cm22,406 cm_empty,407 ) = await self._create_initial_data(session)408 stmt = sa.select(Comment).order_by(*Comment.order_expr("rating"))409 comments = (await session.execute(stmt)).scalars().all()410 assert comments[-1] == cm_empty411 # cm11 and cm21 have equal ratings, so they can occur in any order412 assert set(comments[0:2]) == {cm11, cm21}413 assert comments[2:4] == [cm12, cm22]414 async def test_desc(self, session):415 (416 u1,417 u2,418 u3,419 p11,420 p12,421 p21,422 p22,423 cm11,424 cm12,425 cm21,426 cm22,427 cm_empty,428 ) = await self._create_initial_data(session)429 stmt = sa.select(Comment).order_by(*Comment.order_expr("-rating"))430 comments = (await session.execute(stmt)).scalars().all()431 assert comments[1:3] == [cm22, cm12]432 # cm11 and cm21 have equal ratings, so they can occur in any order433 assert set(comments[3:]) == {cm11, cm21}434 assert comments[0] == cm_empty435 async def test_hybrid_properties(self, session):436 (437 u1,438 u2,439 u3,440 p11,441 p12,442 p21,443 p22,444 cm11,445 cm12,446 cm21,447 cm22,448 cm_empty,449 ) = await self._create_initial_data(session)450 stmt = sa.select(Post).order_by(*Post.order_expr("public"))451 posts = (await session.execute(stmt)).scalars().all()452 assert posts[0] == p11453 stmt = sa.select(Post).order_by(*Post.order_expr("-public"))454 posts = (await session.execute(stmt)).scalars().all()455 assert posts[-1] == p11456 async def test_combinations(self, session):457 (458 u1,459 u2,460 u3,461 p11,462 p12,463 p21,464 p22,465 cm11,466 cm12,467 cm21,468 cm22,469 cm_empty,470 ) = await self._create_initial_data(session)471 """ test various combinations """472 # 1. rating ASC, created_at ASC473 stmt = sa.select(Comment).order_by(*Comment.order_expr("rating", "created_at"))474 comments = (await session.execute(stmt)).scalars().all()475 assert comments == [cm11, cm21, cm12, cm22, cm_empty]476 # 2. rating ASC, created_at DESC477 stmt = sa.select(Comment).order_by(*Comment.order_expr("rating", "-created_at"))478 comments = (await session.execute(stmt)).scalars().all()479 assert comments == [cm21, cm11, cm12, cm22, cm_empty]480 # 3. rating DESC, created_at ASC481 stmt = sa.select(Comment).order_by(*Comment.order_expr("-rating", "created_at"))482 comments = (await session.execute(stmt)).scalars().all()483 assert comments == [cm_empty, cm22, cm12, cm11, cm21]484 # 4. rating DESC, created_at DESC485 stmt = sa.select(Comment).order_by(486 *Comment.order_expr("-rating", "-created_at")487 )488 comments = (await session.execute(stmt)).scalars().all()489 assert comments == [cm_empty, cm22, cm12, cm21, cm11]490@pytest.mark.incremental491@pytest.mark.asyncio492@pytest.mark.usefixtures("setup_db", "session")493class TestSmartQueryFilters(BaseTest):494 async def test_incorrect_expr(self):495 with pytest.raises(KeyError):496 _ = User.where(INCORRECT_ATTR="nomatter")497 async def test_is_a_shortcut_to_filter_expr_in_simple_cases(self, session):498 """test when have no joins, where() is a shortcut for filter_expr"""499 stmt = sa.select(Comment).filter(500 *Comment.filter_expr(rating__gt=2, body__startswith="cm1")501 )502 comments_filters_expr = (await session.execute(stmt)).scalars().all()503 stmt = Comment.where(rating__gt=2, body__startswith="cm1")504 comments_where = (await session.execute(stmt)).scalars().all()505 assert comments_filters_expr == comments_where506 async def test_is_a_shortcut_to_smart_query(self, session):507 """test that where() is just a shortcut for smart_query()"""508 comments_where = (509 (await session.execute(Comment.where(rating__gt=2))).scalars().all()510 )511 comments_smart_query = (512 (await session.execute(Comment.smart_query(filters=dict(rating__gt=2))))513 .scalars()514 .all()515 )516 assert comments_where == comments_smart_query517 async def test_incorrect_relation_name(self):518 with pytest.raises(KeyError):519 _ = User.where(INCORRECT_RELATION="nomatter").all()520 with pytest.raises(KeyError):521 _ = User.where(post___INCORRECT_RELATION="nomatter").all()522 async def test_relations(self, session):523 (524 u1,525 u2,526 u3,527 p11,528 p12,529 p21,530 p22,531 cm11,532 cm12,533 cm21,534 cm22,535 cm_empty,536 ) = await self._create_initial_data(session)537 # users having posts which are commented by user 2538 comments = (539 (await session.execute(User.where(posts___comments___user_id=u2.id)))540 .unique()541 .scalars()542 .all()543 )544 assert set(comments) == {u1}545 # comments where user name starts with 'Bi'546 comments = (547 (await session.execute(Comment.where(user___name__startswith="Bi")))548 .unique()549 .scalars()550 .all()551 )552 assert set(comments) == {cm11, cm21, cm22}553 # comments on posts where author name starts with 'Bi'554 # !! ATTENTION !!555 # about Comment.post:556 # although we have Post.comments relationship,557 # it's important to **add relationship Comment.post** too,558 # not just use backref !!!559 comments = (560 (await session.execute(Comment.where(post___user___name__startswith="Bi")))561 .unique()562 .scalars()563 .all()564 )565 assert set(comments) == {cm11, cm12}566 async def test_combinations(self, session):567 (568 u1,569 u2,570 u3,571 p11,572 p12,573 p21,574 p22,575 cm11,576 cm12,577 cm21,578 cm22,579 cm_empty,580 ) = await self._create_initial_data(session)581 # non-public posts commented by user 1582 comments = (583 (await session.execute(Post.where(public=False, is_commented_by_user=u1)))584 .unique()585 .scalars()586 .all()587 )588 assert set(comments) == {p11}589@pytest.mark.incremental590@pytest.mark.asyncio591@pytest.mark.usefixtures("setup_db", "session")592class TestSmartQuerySort(BaseTest):593 def test_incorrect_expr(self):594 with pytest.raises(KeyError):595 _ = Post.sort("INCORRECT_ATTR")596 with pytest.raises(KeyError):597 _ = Post.sort("*body")598 async def test_is_a_shortcut_to_order_expr_in_simple_cases(self, session):599 """when have no joins, sort() is a shortcut for order_expr"""600 comments_order_expr = (601 (602 await session.execute(603 sa.select(Comment).order_by(*Comment.order_expr("rating"))604 )605 )606 .unique()607 .scalars()608 .all()609 )610 comments_sort = (611 (await session.execute(Comment.sort("rating"))).unique().scalars().all()612 )613 assert comments_order_expr == comments_sort614 comments_order_expr = (615 (616 await session.execute(617 sa.select(Comment).order_by(618 *Comment.order_expr("rating", "created_at")619 )620 )621 )622 .unique()623 .scalars()624 .all()625 )626 comments_sort = (627 (await session.execute(Comment.sort("rating", "created_at")))628 .unique()629 .scalars()630 .all()631 )632 assert comments_order_expr == comments_sort633 # hybrid properties634 posts_order_expr = (635 (636 await session.execute(637 sa.select(Post).order_by(*Post.order_expr("public"))638 )639 )640 .unique()641 .scalars()642 .all()643 )644 posts_sort = (645 (await session.execute(Post.sort("public"))).unique().scalars().all()646 )647 assert posts_order_expr == posts_sort648 async def test_is_a_shortcut_to_smart_query(self, session):649 """test that sort() is just a shortcut for smart_query()"""650 comments_sort = (651 (await session.execute(Comment.sort("rating"))).unique().scalars().all()652 )653 comments_smart_query = (654 (await session.execute(Comment.smart_query(sort_attrs=["rating"])))655 .unique()656 .scalars()657 .all()658 )659 assert comments_sort == comments_smart_query660 def test_incorrect_relation_name(self):661 with pytest.raises(KeyError):662 _ = User.sort("INCORRECT_RELATION")663 with pytest.raises(KeyError):664 _ = User.sort("post___INCORRECT_RELATION")665 async def test_relations(self, session):666 """test that sort() is just a shortcut for smart_query()"""667 (668 u1,669 u2,670 u3,671 p11,672 p12,673 p21,674 p22,675 cm11,676 cm12,677 cm21,678 cm22,679 cm_empty,680 ) = await self._create_initial_data(session)681 comments = (682 (await session.execute(Comment.sort("user___name")))683 .unique()684 .scalars()685 .all()686 )687 assert comments[:2], [cm_empty, cm12]688 assert set(comments[1:3]) == {cm11, cm21}689 assert comments[3] == cm22690 comments = (691 (await session.execute(Comment.sort("user___name", "-created_at")))692 .unique()693 .scalars()694 .all()695 )696 assert comments == [cm12, cm21, cm11, cm22, cm_empty]697 # hybrid_property698 comments = (699 (700 await session.execute(701 Comment.sort("-post___public", "post___user___name")702 )703 )704 .unique()705 .scalars()706 .all()707 )708 # posts by same user709 assert set(comments[1:3]) == {cm21, cm22}710 assert comments[2:], [cm12, cm11, cm_empty]711@pytest.mark.incremental712@pytest.mark.asyncio713@pytest.mark.usefixtures("setup_db", "session")714class TestFullSmartQuery(BaseTest):715 async def test_schema_with_strings(self, session):716 (717 u1,718 u2,719 u3,720 p11,721 p12,722 p21,723 p22,724 cm11,725 cm12,726 cm21,727 cm22,728 cm_empty,729 ) = await self._create_initial_data(session)730 # standalone function731 stmt = smart_query(732 Comment,733 filters={"post___public": True, "user__isnull": False},734 sort_attrs=["user___name", "-created_at"],735 schema={"post": {"user": JOINED}},736 )737 comments = (await session.execute(stmt)).unique().scalars().all()738 assert comments == [cm12, cm21, cm22]739 # class method740 stmt = Comment.smart_query(741 filters={"post___public": True, "user__isnull": False},742 sort_attrs=["user___name", "-created_at"],743 schema={"post": {"user": JOINED}},744 )745 comments = (await session.execute(stmt)).unique().scalars().all()746 assert comments == [cm12, cm21, cm22]747 async def test_schema_with_class_properties(self, session):748 (749 u1,750 u2,751 u3,752 p11,753 p12,754 p21,755 p22,756 cm11,757 cm12,758 cm21,759 cm22,760 cm_empty,761 ) = await self._create_initial_data(session)762 # standalone function763 stmt = smart_query(764 Comment,765 filters={"post___public": True, "user__isnull": False},766 sort_attrs=["user___name", "-created_at"],767 schema={Comment.post: {Post.user: JOINED}},768 )769 comments = (await session.execute(stmt)).unique().scalars().all()770 assert comments == [cm12, cm21, cm22]771 # class method772 stmt = Comment.smart_query(773 filters={"post___public": True, "user__isnull": False},774 sort_attrs=["user___name", "-created_at"],775 schema={Comment.post: {Post.user: JOINED}},776 )777 comments = (await session.execute(stmt)).unique().scalars().all()778 assert comments == [cm12, cm21, cm22]779@pytest.mark.incremental780@pytest.mark.asyncio781@pytest.mark.usefixtures("setup_db", "session")782class TestSmartQueryAutoEagerLoad(BaseTest):783 """784 Smart_query does auto-joins for filtering/sorting,785 so there's a sense to tell sqlalchemy that we alreeady joined that relation786 So we test that relations are set to be joinedload787 if they were used in smart_query()788 """789 async def _create_initial_data(self, session):790 result = await super()._create_initial_data(session)791 self.query_count = 0792 @sa.event.listens_for(engine.sync_engine, "before_cursor_execute")793 def before_cursor_execute(794 conn, cursor, statement, parameters, context, executemany795 ):796 self.query_count += 1797 return result798 async def test_sort(self, session):799 (800 u1,801 u2,802 u3,803 p11,804 p12,805 p21,806 p22,807 cm11,808 cm12,809 cm21,810 cm22,811 cm_empty,812 ) = await self._create_initial_data(session)813 self.query_count = 0814 comments = (815 (816 await session.execute(817 Comment.sort("-post___public", "post___user___name")818 )819 )820 .unique()821 .scalars()822 .all()823 )824 assert self.query_count == 1825 _ = comments[0].post826 # no additional query needed: we used 'post' relation in smart_query()827 assert self.query_count == 1828 _ = comments[1].post.user829 # no additional query needed: we used 'post' relation in smart_query()830 assert self.query_count == 1831 async def test_where(self, session):832 (833 u1,834 u2,835 u3,836 p11,837 p12,838 p21,839 p22,840 cm11,841 cm12,842 cm21,843 cm22,844 cm_empty,845 ) = await self._create_initial_data(session)846 self.query_count = 0847 comments = (848 (849 await session.execute(850 Comment.where(post___public=True, post___user___name__like="Bi%")851 )852 )853 .unique()854 .scalars()855 .all()856 )857 assert self.query_count == 1858 _ = comments[0].post859 # no additional query needed: we used 'post' relation in smart_query()860 assert self.query_count == 1861 _ = comments[0].post.user862 # no additional query needed: we used 'post' relation in smart_query()863 assert self.query_count == 1864 async def test_explicitly_set_in_schema_joinedload(self, session):865 """866 here we explicitly set in schema that we additionally want to load867 post___comments868 """869 (870 u1,871 u2,872 u3,873 p11,874 p12,875 p21,876 p22,877 cm11,878 cm12,879 cm21,880 cm22,881 cm_empty,882 ) = await self._create_initial_data(session)883 self.query_count = 0884 stmt = Comment.smart_query(885 filters=dict(post___public=True, post___user___name__like="Bi%"),886 schema={"post": {"comments": JOINED}},887 )888 comments = (await session.execute(stmt)).unique().scalars().all()889 assert self.query_count == 1890 _ = comments[0].post891 # no additional query needed: we used 'post' relation in smart_query()892 assert self.query_count == 1893 _ = comments[0].post.user894 # no additional query needed: we used 'post' relation in smart_query()895 assert self.query_count == 1896 async def test_explicitly_set_in_schema_subqueryload(self, session):897 """898 here we explicitly set in schema that we additionally want to load899 post___comments900 """901 (902 u1,903 u2,904 u3,905 p11,906 p12,907 p21,908 p22,909 cm11,910 cm12,911 cm21,912 cm22,913 cm_empty,914 ) = await self._create_initial_data(session)915 self.query_count = 0916 stmt = Comment.smart_query(917 filters=dict(post___public=True, post___user___name__like="Bi%"),918 schema={"post": {"comments": SUBQUERY}},919 )920 comments = (await session.execute(stmt)).unique().scalars().all()921 assert self.query_count == 2922 _ = comments[0].post923 # no additional query needed: we used 'post' relation in smart_query()924 assert self.query_count == 2925 _ = comments[0].post.user926 # no additional query needed: we used 'post' relation in smart_query()927 assert self.query_count == 2928 # we didn't use post___comments,929 # BUT we explicitly set it in schema!930 # so additional query is NOT needed931 _ = comments[0].post.comments932 assert self.query_count == 2933 async def test_override_eagerload_method_in_schema(self, session):934 """935 here we use 'post' relation in filters,936 but we want to load 'post' relation in SEPARATE QUERY (subqueryload)937 so we set load method in schema938 """939 (940 u1,941 u2,942 u3,943 p11,944 p12,945 p21,946 p22,947 cm11,948 cm12,949 cm21,950 cm22,951 cm_empty,952 ) = await self._create_initial_data(session)953 self.query_count = 0954 stmt = Comment.smart_query(955 filters=dict(post___public=True, post___user___name__like="Bi%"),956 schema={"post": SUBQUERY},957 )958 comments = (await session.execute(stmt)).unique().scalars().all()959 assert self.query_count == 2960 _ = comments[0].post961 # no additional query needed: we used 'post' relation in smart_query()962 assert self.query_count == 2963 # Test nested schemas964 self.query_count = 0965 stmt = Comment.smart_query(966 filters=dict(post___public=True, post___user___name__like="Bi%"),...

Full Screen

Full Screen

test_eagerload.py

Source:test_eagerload.py Github

copy

Full Screen

...48async def session():49 async with in_tx(async_session) as db:50 yield db51class BaseTest:52 async def _create_initial_data(self, db):53 u1 = User(name="Bill u1")54 db.add(u1)55 await db.flush()56 u2 = User(name="Alex u2")57 db.add(u2)58 await db.flush()59 u3 = User(name="Bishop u3")60 db.add(u3)61 await db.flush()62 p11 = Post(id=11, body="1234567890123", archived=True, user=u1)63 db.add(p11)64 await db.flush()65 p12 = Post(id=12, body="1234567890", user=u1)66 db.add(p12)67 await db.flush()68 p21 = Post(id=21, body="p21 by u2", user=u2)69 db.add(p21)70 await db.flush()71 p22 = Post(id=22, body="p22 by u2", user=u2)72 db.add(p22)73 await db.flush()74 cm11 = Comment(75 id=11,76 body="cm11 to p11",77 user=u1,78 post=p11,79 rating=1,80 )81 db.add(cm11)82 await db.flush()83 cm12 = Comment(84 id=12,85 body="cm12 to p12",86 user=u2,87 post=p12,88 rating=2,89 )90 db.add(cm12)91 await db.flush()92 cm21 = Comment(93 id=21,94 body="cm21 to p21",95 user=u1,96 post=p21,97 rating=1,98 )99 db.add(cm21)100 await db.flush()101 cm22 = Comment(102 id=22,103 body="cm22 to p22",104 user=u3,105 post=p22,106 rating=3,107 )108 db.add(cm22)109 await db.flush()110 cm_empty = Comment(111 id=29,112 # no body113 # no user114 # no post115 # no rating116 )117 db.add(cm_empty)118 await db.flush()119 return u1, u2, u3, p11, p12, p21, p22, cm11, cm12, cm21, cm22, cm_empty120@pytest.mark.incremental121@pytest.mark.asyncio122@pytest.mark.usefixtures("setup_db", "session")123class TestEagerExpr(BaseTest):124 async def _test_ok(self, session, schema):125 assert self.query_count == 0126 stmt = sa.select(User).options(*eager_expr(schema)).filter_by(id=1)127 user = (await session.execute(stmt)).scalar_one()128 assert self.query_count == 2129 # now, to get relationships, NO additional query is needed130 post = user.posts[0]131 _ = post.comments[0]132 assert self.query_count == 2133 async def _create_initial_data(self, session):134 result = await super()._create_initial_data(session)135 self.query_count = 0136 @sa.event.listens_for(engine.sync_engine, "before_cursor_execute")137 def before_cursor_execute(138 conn, cursor, statement, parameters, context, executemany139 ):140 self.query_count += 1141 return result142 async def test_ok_strings(self, session):143 await self._create_initial_data(session)144 schema = {User.posts: (SUBQUERY, {Post.comments: JOINED})}145 await self._test_ok(session, schema)146 async def test_ok_class_properties(self, session):147 await self._create_initial_data(session)148 schema = {"posts": (SUBQUERY, {"comments": JOINED})}149 await self._test_ok(session, schema)150 async def test_bad_join_method(self, session):151 # None152 schema = {"posts": None}153 with pytest.raises(ValueError):154 sa.select(User).options(*eager_expr(schema))155 # strings156 schema = {157 "posts": ("WRONG JOIN METHOD", {Post.comments: "OTHER WRONG JOIN METHOD"})158 }159 with pytest.raises(ValueError):160 sa.select(User).options(*eager_expr(schema))161 # class properties162 schema = {163 User.posts: (164 "WRONG JOIN METHOD",165 {Post.comments: "OTHER WRONG JOIN METHOD"},166 )167 }168 with pytest.raises(ValueError):169 sa.select(User).options(*eager_expr(schema))170@pytest.mark.incremental171@pytest.mark.asyncio172@pytest.mark.usefixtures("setup_db", "session")173class TestOrmWithJoinedStrings(BaseTest):174 async def _create_initial_data(self, session):175 result = await super()._create_initial_data(session)176 self.query_count = 0177 @sa.event.listens_for(engine.sync_engine, "before_cursor_execute")178 def before_cursor_execute(179 conn, cursor, statement, parameters, context, executemany180 ):181 self.query_count += 1182 return result183 async def test(self, session):184 await self._create_initial_data(session)185 assert self.query_count == 0186 # take post with user and comments (including comment author)187 # NOTE: you can separate relations with dot.188 # Its due to SQLAlchemy: https://goo.gl/yM2DLX189 stmt = Post.with_joined("user", "comments", "comments.user")190 post = await session.scalar(stmt)191 assert self.query_count == 1192 # now, to get relationship, NO additional query is needed193 _ = post.user194 _ = post.comments[0]195 _ = post.comments[0].user196 assert self.query_count == 1197@pytest.mark.incremental198@pytest.mark.asyncio199@pytest.mark.usefixtures("setup_db", "session")200class TestOrmWithJoinedClassProperties(BaseTest):201 async def _create_initial_data(self, session):202 result = await super()._create_initial_data(session)203 self.query_count = 0204 @sa.event.listens_for(engine.sync_engine, "before_cursor_execute")205 def before_cursor_execute(206 conn, cursor, statement, parameters, context, executemany207 ):208 self.query_count += 1209 return result210 async def test(self, session):211 await self._create_initial_data(session)212 assert self.query_count == 0213 stmt = Post.with_joined(Post.comments, Post.user)214 post = await session.scalar(stmt)215 # now, to get relationship, NO additional query is needed216 _ = post.comments[0]217 _ = post.user218 assert self.query_count == 1219@pytest.mark.incremental220@pytest.mark.asyncio221@pytest.mark.usefixtures("setup_db", "session")222class TestOrmWithSubquery(BaseTest):223 async def _create_initial_data(self, session):224 result = await super()._create_initial_data(session)225 self.query_count = 0226 @sa.event.listens_for(engine.sync_engine, "before_cursor_execute")227 def before_cursor_execute(228 conn, cursor, statement, parameters, context, executemany229 ):230 self.query_count += 1231 return result232 async def test(self, session):233 await self._create_initial_data(session)234 assert self.query_count == 0235 # take post with user and comments (including comment author)236 # NOTE: you can separate relations with dot.237 # Its due to SQLAlchemy: https://goo.gl/yM2DLX238 stmt = Post.with_subquery("user", "comments", "comments.user")239 post = await session.scalar(stmt)240 # 3 queries were executed:241 # 1 - on posts242 # 2 - on user (eagerload subquery)243 # 3 - on comments (eagerload subquery)244 # 4 - on comments authors (eagerload subquery)245 assert self.query_count == 4246 # now, to get relationship, NO additional query is needed247 _ = post.user248 _ = post.comments[0]249 _ = post.comments[0].user250 assert self.query_count == 4251@pytest.mark.incremental252@pytest.mark.asyncio253@pytest.mark.usefixtures("setup_db", "session")254class TestOrmWithSubqueryClassProperties(BaseTest):255 async def _create_initial_data(self, session):256 result = await super()._create_initial_data(session)257 self.query_count = 0258 @sa.event.listens_for(engine.sync_engine, "before_cursor_execute")259 def before_cursor_execute(260 conn, cursor, statement, parameters, context, executemany261 ):262 self.query_count += 1263 return result264 async def test(self, session):265 await self._create_initial_data(session)266 assert self.query_count == 0267 stmt = Post.with_subquery(Post.comments, Post.user)268 post = await session.scalar(stmt)269 # 3 queries were executed:270 # 1 - on posts271 # 2 - on comments (eagerload subquery)272 # 3 - on user (eagerload subquery)273 assert self.query_count == 3274 # now, to get relationship, NO additional query is needed275 _ = post.comments[0]276 _ = post.user277 assert self.query_count == 3278@pytest.mark.incremental279@pytest.mark.asyncio280@pytest.mark.usefixtures("setup_db", "session")281class TestOrmWithDict(BaseTest):282 async def _test_joinedload(self, schema, session):283 assert self.query_count == 0284 stmt = Post.with_(schema)285 post = await session.scalar(stmt)286 assert self.query_count == 1287 # now, to get relationship, NO additional query is needed288 _ = post.comments[0]289 assert self.query_count == 1290 async def _create_initial_data(self, session):291 result = await super()._create_initial_data(session)292 self.query_count = 0293 @sa.event.listens_for(engine.sync_engine, "before_cursor_execute")294 def before_cursor_execute(295 conn, cursor, statement, parameters, context, executemany296 ):297 self.query_count += 1298 return result299 async def test_joinedload_strings(self, session):300 await self._create_initial_data(session)301 assert self.query_count == 0302 schema = {'comments': JOINED}303 await self._test_joinedload(schema, session)304 async def test_joinedload_class_properties(self, session):305 await self._create_initial_data(session)306 assert self.query_count == 0307 schema = {Post.comments: JOINED}308 await self._test_joinedload(schema, session)309 async def _test_subqueryload(self, schema, session):310 assert self.query_count == 0311 stmt = Post.with_(schema)312 post = await session.scalar(stmt)313 assert self.query_count == 2314 # to get relationship, NO additional query is needed315 _ = post.comments[0]316 assert self.query_count == 2317 async def test_subqueryload_strings(self, session):318 await self._create_initial_data(session)319 assert self.query_count == 0320 schema = {'comments': SUBQUERY}321 await self._test_subqueryload(schema, session)322 async def test_subqueryload_class_properties(self, session):323 await self._create_initial_data(session)324 assert self.query_count == 0325 schema = {Post.comments: SUBQUERY}326 await self._test_subqueryload(schema, session)327 async def _test_combined_load(self, schema, session):328 assert self.query_count == 0329 stmt = User.with_(schema)330 user = await session.scalar(stmt)331 assert self.query_count == 2332 # now, to get relationships, NO additional query is needed333 post = user.posts[0]334 _ = post.comments[0]335 assert self.query_count == 2336 async def test_combined_load_strings(self, session):337 await self._create_initial_data(session)338 assert self.query_count == 0339 schema = {340 User.posts: (SUBQUERY, {341 Post.comments: JOINED342 })343 }344 await self._test_combined_load(schema, session)345 async def test_combined_load_class_properties(self, session):346 await self._create_initial_data(session)347 assert self.query_count == 0348 schema = {349 'posts': (SUBQUERY, {350 'comments': JOINED351 })352 }...

Full Screen

Full Screen

customer_manager.py

Source:customer_manager.py Github

copy

Full Screen

...17 def __init__(self):18 super().__init__()19 # K:id, V:customer20 self._customers = {}21 self._create_initial_data()22 # TODO: remove this dummy data23 def _create_initial_data(self):24 cid = self._get_next_id()25 self._customers[cid] = Customer(cid, "Joe", "123 Here St, Boulder, CO", CustomerType.STANDARD)26 cid = self._get_next_id()27 self._customers[cid] = Customer(cid, "Betty", "456 There St, Boulder, CO", CustomerType.PREMIUM)28 def get_all(self):29 # TODO: cache this30 temp_list = [c for c in self._customers.values()]31 schema = CustomerSchema(many=True)32 return jsonify(schema.dump(temp_list).data)33 def get(self, customer_id):34 cid, err = BaseManager.convert_string_id(customer_id)35 if err:36 return err, HTTPStatus.BAD_REQUEST37 if cid in self._customers:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful