How to use expect_response method in Playwright Python

Best Python code snippet using playwright-python

test_content_libraries.py

Source:test_content_libraries.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2"""3Tests for Blockstore-based Content Libraries4"""5from uuid import UUID6import ddt7from django.conf import settings8from django.contrib.auth.models import Group9from django.test.utils import override_settings10from mock import patch11from organizations.models import Organization12from openedx.core.djangoapps.content_libraries.tests.base import ContentLibrariesRestApiTest, elasticsearch_test13from openedx.core.djangoapps.content_libraries.constants import VIDEO, COMPLEX, PROBLEM, CC_4_BY, ALL_RIGHTS_RESERVED14from common.djangoapps.student.tests.factories import UserFactory15@ddt.ddt16@elasticsearch_test17class ContentLibrariesTest(ContentLibrariesRestApiTest):18 """19 General tests for Blockstore-based Content Libraries20 These tests use the REST API, which in turn relies on the Python API.21 Some tests may use the python API directly if necessary to provide22 coverage of any code paths not accessible via the REST API.23 In general, these tests should24 (1) Use public APIs only - don't directly create data using other methods,25 which results in a less realistic test and ties the test suite too26 closely to specific implementation details.27 (Exception: users can be provisioned using a user factory)28 (2) Assert that fields are present in responses, but don't assert that the29 entire response has some specific shape. That way, things like adding30 new fields to an API response, which are backwards compatible, won't31 break any tests, but backwards-incompatible API changes will.32 WARNING: every test should have a unique library slug, because even though33 the django/mysql database gets reset for each test case, the lookup between34 library slug and bundle UUID does not because it's assumed to be immutable35 and cached forever.36 """37 def test_library_crud(self):38 """39 Test Create, Read, Update, and Delete of a Content Library40 """41 # Create:42 lib = self._create_library(43 slug="lib-crud", title="A Test Library", description="Just Testing", license_type=CC_4_BY,44 )45 expected_data = {46 "id": "lib:CL-TEST:lib-crud",47 "org": "CL-TEST",48 "slug": "lib-crud",49 "title": "A Test Library",50 "description": "Just Testing",51 "version": 0,52 "type": COMPLEX,53 "license": CC_4_BY,54 "has_unpublished_changes": False,55 "has_unpublished_deletes": False,56 }57 self.assertDictContainsEntries(lib, expected_data)58 # Check that bundle_uuid looks like a valid UUID59 UUID(lib["bundle_uuid"]) # will raise an exception if not valid60 # Read:61 lib2 = self._get_library(lib["id"])62 self.assertDictContainsEntries(lib2, expected_data)63 # Update:64 lib3 = self._update_library(lib["id"], title="New Title")65 expected_data["title"] = "New Title"66 self.assertDictContainsEntries(lib3, expected_data)67 # Delete:68 self._delete_library(lib["id"])69 # And confirm it is deleted:70 self._get_library(lib["id"], expect_response=404)71 self._delete_library(lib["id"], expect_response=404)72 @ddt.data(VIDEO, PROBLEM, COMPLEX)73 def test_library_alternative_type(self, target_type):74 """75 Create a library with a specific type76 """77 lib = self._create_library(78 slug="some-slug", title="Video Library", description="Test Video Library", library_type=target_type,79 )80 expected_data = {81 "id": "lib:CL-TEST:some-slug",82 "org": "CL-TEST",83 "slug": "some-slug",84 "title": "Video Library",85 "type": target_type,86 "description": "Test Video Library",87 "version": 0,88 "has_unpublished_changes": False,89 "has_unpublished_deletes": False,90 "license": ALL_RIGHTS_RESERVED,91 }92 self.assertDictContainsEntries(lib, expected_data)93 # Need to use a different slug each time here. Seems to be a race condition on test cleanup that will break things94 # otherwise.95 @ddt.data(96 ('to-video-fail', COMPLEX, VIDEO, (("problem", "problemA"),), 400),97 ('to-video-empty', COMPLEX, VIDEO, tuple(), 200),98 ('to-problem', COMPLEX, PROBLEM, (("problem", "problemB"),), 200),99 ('to-problem-fail', COMPLEX, PROBLEM, (("video", "videoA"),), 400),100 ('to-problem-empty', COMPLEX, PROBLEM, tuple(), 200),101 ('to-complex-from-video', VIDEO, COMPLEX, (("video", "videoB"),), 200),102 ('to-complex-from-problem', PROBLEM, COMPLEX, (("problem", "problemC"),), 200),103 ('to-complex-from-problem-empty', PROBLEM, COMPLEX, tuple(), 200),104 ('to-problem-from-video-empty', PROBLEM, VIDEO, tuple(), 200),105 )106 @ddt.unpack107 def test_library_update_type_conversion(self, slug, start_type, target_type, xblock_specs, expect_response):108 """109 Test conversion of one library type to another. Restricts options based on type/block matching.110 """111 lib = self._create_library(112 slug=slug, title="A Test Library", description="Just Testing", library_type=start_type,113 )114 self.assertEqual(lib['type'], start_type)115 for block_type, block_slug in xblock_specs:116 self._add_block_to_library(lib['id'], block_type, block_slug)117 self._commit_library_changes(lib['id'])118 result = self._update_library(lib['id'], type=target_type, expect_response=expect_response)119 if expect_response == 200:120 self.assertEqual(result['type'], target_type)121 self.assertIn('type', result)122 else:123 lib = self._get_library(lib['id'])124 self.assertEqual(lib['type'], start_type)125 def test_no_convert_on_unpublished(self):126 """127 Verify that you can't change a library's type, even if it would normally be valid,128 when there are unpublished changes. This is so that a reversion of blocks won't cause an inconsistency.129 """130 lib = self._create_library(131 slug='resolute', title="A complex library", description="Unconvertable", library_type=COMPLEX,132 )133 self._add_block_to_library(lib['id'], "video", 'vid-block')134 result = self._update_library(lib['id'], type=VIDEO, expect_response=400)135 self.assertIn('type', result)136 def test_no_convert_on_pending_deletes(self):137 """138 Verify that you can't change a library's type, even if it would normally be valid,139 when there are unpublished changes. This is so that a reversion of blocks won't cause an inconsistency.140 """141 lib = self._create_library(142 slug='still-alive', title="A complex library", description="Unconvertable", library_type=COMPLEX,143 )144 block = self._add_block_to_library(lib['id'], "video", 'vid-block')145 self._commit_library_changes(lib['id'])146 self._delete_library_block(block['id'])147 result = self._update_library(lib['id'], type=VIDEO, expect_response=400)148 self.assertIn('type', result)149 def test_library_validation(self):150 """151 You can't create a library with the same slug as an existing library,152 or an invalid slug.153 """154 self._create_library(slug="some-slug", title="Existing Library")155 self._create_library(slug="some-slug", title="Duplicate Library", expect_response=400)156 self._create_library(slug="Invalid Slug!", title="Library with Bad Slug", expect_response=400)157 @ddt.data(True, False)158 @patch("openedx.core.djangoapps.content_libraries.views.LibraryApiPagination.page_size", new=2)159 def test_list_library(self, is_indexing_enabled):160 """161 Test the /libraries API and its pagination162 """163 with override_settings(FEATURES={**settings.FEATURES, 'ENABLE_CONTENT_LIBRARY_INDEX': is_indexing_enabled}):164 lib1 = self._create_library(slug="some-slug-1", title="Existing Library")165 lib2 = self._create_library(slug="some-slug-2", title="Existing Library")166 if not is_indexing_enabled:167 lib1['num_blocks'] = lib2['num_blocks'] = None168 lib1['last_published'] = lib2['last_published'] = None169 lib1['has_unpublished_changes'] = lib2['has_unpublished_changes'] = None170 lib1['has_unpublished_deletes'] = lib2['has_unpublished_deletes'] = None171 result = self._list_libraries()172 self.assertEqual(len(result), 2)173 self.assertIn(lib1, result)174 self.assertIn(lib2, result)175 result = self._list_libraries({'pagination': 'true'})176 self.assertEqual(len(result['results']), 2)177 self.assertEqual(result['next'], None)178 # Create another library which causes number of libraries to exceed the page size179 self._create_library(slug="some-slug-3", title="Existing Library")180 # Verify that if `pagination` param isn't sent, API still honors the max page size.181 # This is for maintaining compatibility with older non pagination-aware clients.182 result = self._list_libraries()183 self.assertEqual(len(result), 2)184 # Pagination enabled:185 # Verify total elements and valid 'next' in page 1186 result = self._list_libraries({'pagination': 'true'})187 self.assertEqual(len(result['results']), 2)188 self.assertIn('page=2', result['next'])189 self.assertIn('pagination=true', result['next'])190 # Verify total elements and null 'next' in page 2191 result = self._list_libraries({'pagination': 'true', 'page': '2'})192 self.assertEqual(len(result['results']), 1)193 self.assertEqual(result['next'], None)194 @ddt.data(True, False)195 def test_library_filters(self, is_indexing_enabled):196 """197 Test the filters in the list libraries API198 """199 with override_settings(FEATURES={**settings.FEATURES, 'ENABLE_CONTENT_LIBRARY_INDEX': is_indexing_enabled}):200 self._create_library(slug="test-lib1", title="Foo", description="Bar", library_type=VIDEO)201 self._create_library(slug="test-lib2", title="Library-Title-2", description="Bar2")202 self._create_library(slug="l3", title="Library-Title-3", description="Description", library_type=VIDEO)203 Organization.objects.get_or_create(204 short_name="org-test",205 defaults={"name": "Content Libraries Tachyon Exploration & Survey Team"},206 )207 self._create_library(208 slug="l4", title="Library-Title-4", description="Library-Description", org='org-test',209 library_type=VIDEO,210 )211 self._create_library(slug="l5", title="Library-Title-5", description="Library-Description", org='org-test')212 self.assertEqual(len(self._list_libraries()), 5)213 self.assertEqual(len(self._list_libraries({'org': 'org-test'})), 2)214 self.assertEqual(len(self._list_libraries({'text_search': 'test-lib'})), 2)215 self.assertEqual(len(self._list_libraries({'text_search': 'test-lib', 'type': VIDEO})), 1)216 self.assertEqual(len(self._list_libraries({'text_search': 'library-title'})), 4)217 self.assertEqual(len(self._list_libraries({'text_search': 'library-title', 'type': VIDEO})), 2)218 self.assertEqual(len(self._list_libraries({'text_search': 'bar'})), 2)219 self.assertEqual(len(self._list_libraries({'text_search': 'org-tes'})), 2)220 self.assertEqual(len(self._list_libraries({'org': 'org-test', 'text_search': 'library-title-4'})), 1)221 self.assertEqual(len(self._list_libraries({'type': VIDEO})), 3)222 # General Content Library XBlock tests:223 def test_library_blocks(self):224 """225 Test the happy path of creating and working with XBlocks in a content226 library.227 """228 lib = self._create_library(slug="testlib1", title="A Test Library", description="Testing XBlocks")229 lib_id = lib["id"]230 self.assertEqual(lib["has_unpublished_changes"], False)231 # A library starts out empty:232 self.assertEqual(self._get_library_blocks(lib_id), [])233 # Add a 'problem' XBlock to the library:234 block_data = self._add_block_to_library(lib_id, "problem", "problem1")235 self.assertDictContainsEntries(block_data, {236 "id": "lb:CL-TEST:testlib1:problem:problem1",237 "display_name": "Blank Advanced Problem",238 "block_type": "problem",239 "has_unpublished_changes": True,240 })241 block_id = block_data["id"]242 # Confirm that the result contains a definition key, but don't check its value,243 # which for the purposes of these tests is an implementation detail.244 self.assertIn("def_key", block_data)245 # now the library should contain one block and have unpublished changes:246 self.assertEqual(self._get_library_blocks(lib_id), [block_data])247 self.assertEqual(self._get_library(lib_id)["has_unpublished_changes"], True)248 # Publish the changes:249 self._commit_library_changes(lib_id)250 self.assertEqual(self._get_library(lib_id)["has_unpublished_changes"], False)251 # And now the block information should also show that block has no unpublished changes:252 block_data["has_unpublished_changes"] = False253 self.assertDictContainsEntries(self._get_library_block(block_id), block_data)254 self.assertEqual(self._get_library_blocks(lib_id), [block_data])255 # Now update the block's OLX:256 orig_olx = self._get_library_block_olx(block_id)257 self.assertIn("<problem", orig_olx)258 new_olx = """259 <problem display_name="New Multi Choice Question" max_attempts="5">260 <multiplechoiceresponse>261 <p>This is a normal capa problem with unicode 🔥. It has "maximum attempts" set to **5**.</p>262 <label>Blockstore is designed to store.</label>263 <choicegroup type="MultipleChoice">264 <choice correct="false">XBlock metadata only</choice>265 <choice correct="true">XBlock data/metadata and associated static asset files</choice>266 <choice correct="false">Static asset files for XBlocks and courseware</choice>267 <choice correct="false">XModule metadata only</choice>268 </choicegroup>269 </multiplechoiceresponse>270 </problem>271 """.strip()272 self._set_library_block_olx(block_id, new_olx)273 # now reading it back, we should get that exact OLX (no change to whitespace etc.):274 self.assertEqual(self._get_library_block_olx(block_id), new_olx)275 # And the display name and "unpublished changes" status of the block should be updated:276 self.assertDictContainsEntries(self._get_library_block(block_id), {277 "display_name": "New Multi Choice Question",278 "has_unpublished_changes": True,279 })280 # Now view the XBlock's student_view (including draft changes):281 fragment = self._render_block_view(block_id, "student_view")282 self.assertIn("resources", fragment)283 self.assertIn("Blockstore is designed to store.", fragment["content"])284 # Also call a handler to make sure that's working:285 handler_url = self._get_block_handler_url(block_id, "xmodule_handler") + "problem_get"286 problem_get_response = self.client.get(handler_url)287 self.assertEqual(problem_get_response.status_code, 200)288 self.assertIn("You have used 0 of 5 attempts", problem_get_response.content.decode('utf-8'))289 # Now delete the block:290 self.assertEqual(self._get_library(lib_id)["has_unpublished_deletes"], False)291 self._delete_library_block(block_id)292 # Confirm it's deleted:293 self._render_block_view(block_id, "student_view", expect_response=404)294 self._get_library_block(block_id, expect_response=404)295 self.assertEqual(self._get_library(lib_id)["has_unpublished_deletes"], True)296 # Now revert all the changes back until the last publish:297 self._revert_library_changes(lib_id)298 self.assertEqual(self._get_library(lib_id)["has_unpublished_deletes"], False)299 self.assertEqual(self._get_library_block_olx(block_id), orig_olx)300 # fin301 @ddt.data(True, False)302 @patch("openedx.core.djangoapps.content_libraries.views.LibraryApiPagination.page_size", new=2)303 def test_list_library_blocks(self, is_indexing_enabled):304 """305 Test the /libraries/{lib_key_str}/blocks API and its pagination306 """307 with override_settings(FEATURES={**settings.FEATURES, 'ENABLE_CONTENT_LIBRARY_INDEX': is_indexing_enabled}):308 lib = self._create_library(slug="list_blocks-slug" + str(is_indexing_enabled), title="Library 1")309 block1 = self._add_block_to_library(lib["id"], "problem", "problem1")310 block2 = self._add_block_to_library(lib["id"], "unit", "unit1")311 self._add_block_to_library(lib["id"], "problem", "problem2", parent_block=block2["id"])312 result = self._get_library_blocks(lib["id"])313 self.assertEqual(len(result), 2)314 self.assertIn(block1, result)315 result = self._get_library_blocks(lib["id"], {'pagination': 'true'})316 self.assertEqual(len(result['results']), 2)317 self.assertEqual(result['next'], None)318 self._add_block_to_library(lib["id"], "problem", "problem3")319 # Test pagination320 result = self._get_library_blocks(lib["id"])321 self.assertEqual(len(result), 3)322 result = self._get_library_blocks(lib["id"], {'pagination': 'true'})323 self.assertEqual(len(result['results']), 2)324 self.assertIn('page=2', result['next'])325 self.assertIn('pagination=true', result['next'])326 result = self._get_library_blocks(lib["id"], {'pagination': 'true', 'page': '2'})327 self.assertEqual(len(result['results']), 1)328 self.assertEqual(result['next'], None)329 @ddt.data(True, False)330 def test_library_blocks_filters(self, is_indexing_enabled):331 """332 Test the filters in the list libraries API333 """334 with override_settings(FEATURES={**settings.FEATURES, 'ENABLE_CONTENT_LIBRARY_INDEX': is_indexing_enabled}):335 lib = self._create_library(slug="test-lib-blocks" + str(is_indexing_enabled), title="Title")336 block1 = self._add_block_to_library(lib["id"], "problem", "foo-bar")337 self._add_block_to_library(lib["id"], "video", "vid-baz")338 self._add_block_to_library(lib["id"], "html", "html-baz")339 self._add_block_to_library(lib["id"], "problem", "foo-baz")340 self._add_block_to_library(lib["id"], "problem", "bar-baz")341 self._set_library_block_olx(block1["id"], "<problem display_name=\"DisplayName\"></problem>")342 self.assertEqual(len(self._get_library_blocks(lib["id"])), 5)343 self.assertEqual(len(self._get_library_blocks(lib["id"], {'text_search': 'Foo'})), 2)344 self.assertEqual(len(self._get_library_blocks(lib["id"], {'text_search': 'Display'})), 1)345 self.assertEqual(len(self._get_library_blocks(lib["id"], {'text_search': 'Video'})), 1)346 self.assertEqual(len(self._get_library_blocks(lib["id"], {'text_search': 'Foo', 'block_type': 'video'})), 0)347 self.assertEqual(len(self._get_library_blocks(lib["id"], {'text_search': 'Baz', 'block_type': 'video'})), 1)348 self.assertEqual(len(349 self._get_library_blocks(lib["id"], {'text_search': 'Baz', 'block_type': ['video', 'html']})),350 2,351 )352 self.assertEqual(len(self._get_library_blocks(lib["id"], {'block_type': 'video'})), 1)353 self.assertEqual(len(self._get_library_blocks(lib["id"], {'block_type': 'problem'})), 3)354 self.assertEqual(len(self._get_library_blocks(lib["id"], {'block_type': 'squirrel'})), 0)355 @ddt.data(356 ('video-problem', VIDEO, 'problem', 400),357 ('video-video', VIDEO, 'video', 200),358 ('problem-problem', PROBLEM, 'problem', 200),359 ('problem-video', PROBLEM, 'video', 400),360 ('complex-video', COMPLEX, 'video', 200),361 ('complex-problem', COMPLEX, 'problem', 200),362 )363 @ddt.unpack364 def test_library_blocks_type_constrained(self, slug, library_type, block_type, expect_response):365 """366 Test that type-constrained libraries enforce their constraint when adding an XBlock.367 """368 lib = self._create_library(369 slug=slug, title="A Test Library", description="Testing XBlocks", library_type=library_type,370 )371 lib_id = lib["id"]372 # Add a 'problem' XBlock to the library:373 self._add_block_to_library(lib_id, block_type, 'test-block', expect_response=expect_response)374 def test_library_blocks_with_hierarchy(self):375 """376 Test library blocks with children377 """378 lib = self._create_library(slug="hierarchy_test_lib", title="A Test Library")379 lib_id = lib["id"]380 # Add a 'unit' XBlock to the library:381 unit_block = self._add_block_to_library(lib_id, "unit", "unit1")382 # Add an HTML child block:383 child1 = self._add_block_to_library(lib_id, "html", "html1", parent_block=unit_block["id"])384 self._set_library_block_olx(child1["id"], "<html>Hello world</html>")385 # Add a problem child block:386 child2 = self._add_block_to_library(lib_id, "problem", "problem1", parent_block=unit_block["id"])387 self._set_library_block_olx(child2["id"], """388 <problem><multiplechoiceresponse>389 <p>What is an even number?</p>390 <choicegroup type="MultipleChoice">391 <choice correct="false">3</choice>392 <choice correct="true">2</choice>393 </choicegroup>394 </multiplechoiceresponse></problem>395 """)396 # Check the resulting OLX of the unit:397 self.assertEqual(self._get_library_block_olx(unit_block["id"]), (398 '<unit xblock-family="xblock.v1">\n'399 ' <xblock-include definition="html/html1"/>\n'400 ' <xblock-include definition="problem/problem1"/>\n'401 '</unit>\n'402 ))403 # The unit can see and render its children:404 fragment = self._render_block_view(unit_block["id"], "student_view")405 self.assertIn("Hello world", fragment["content"])406 self.assertIn("What is an even number?", fragment["content"])407 # We cannot add a duplicate ID to the library, either at the top level or as a child:408 self._add_block_to_library(lib_id, "problem", "problem1", expect_response=400)409 self._add_block_to_library(lib_id, "problem", "problem1", parent_block=unit_block["id"], expect_response=400)410 # Test that permissions are enforced for content libraries411 def test_library_permissions(self): # pylint: disable=too-many-statements412 """413 Test that permissions are enforced for content libraries, and that414 permissions can be read and manipulated using the REST API (which in415 turn tests the python API).416 This is a single giant test case, because that optimizes for the fastest417 test run time, even though it can make debugging failures harder.418 """419 # Create a few users to use for all of these tests:420 admin = UserFactory.create(username="Admin", email="admin@example.com")421 author = UserFactory.create(username="Author", email="author@example.com")422 reader = UserFactory.create(username="Reader", email="reader@example.com")423 group = Group.objects.create(name="group1")424 author_group_member = UserFactory.create(username="GroupMember", email="groupmember@example.com")425 author_group_member.groups.add(group)426 random_user = UserFactory.create(username="Random", email="random@example.com")427 never_added = UserFactory.create(username="Never", email="never@example.com")428 # Library CRUD #########################################################429 # Create a library, owned by "Admin"430 with self.as_user(admin):431 lib = self._create_library(slug="permtest", title="Permission Test Library", description="Testing")432 lib_id = lib["id"]433 # By default, "public learning" and public read access are disallowed.434 self.assertEqual(lib["allow_public_learning"], False)435 self.assertEqual(lib["allow_public_read"], False)436 # By default, the creator of a new library is the only admin437 data = self._get_library_team(lib_id)438 self.assertEqual(len(data), 1)439 self.assertDictContainsEntries(data[0], {440 "username": admin.username, "group_name": None, "access_level": "admin",441 })442 # Add the other users to the content library:443 self._set_user_access_level(lib_id, author.username, access_level="author")444 # Delete it, add it again.445 self._remove_user_access(lib_id, author.username)446 self._set_user_access_level(lib_id, author.username, access_level="author")447 # Add one of them via the email-based creation endpoint.448 self._add_user_by_email(lib_id, reader.email, access_level="read")449 self._set_group_access_level(lib_id, group.name, access_level="author")450 team_response = self._get_library_team(lib_id)451 self.assertEqual(len(team_response), 4)452 # We'll use this one later.453 reader_grant = {"username": reader.username, "group_name": None, "access_level": "read"}454 # The response should also always be sorted in a specific order (by username and group name):455 expected_response = [456 {"username": None, "group_name": "group1", "access_level": "author"},457 {"username": admin.username, "group_name": None, "access_level": "admin"},458 {"username": author.username, "group_name": None, "access_level": "author"},459 reader_grant,460 ]461 for entry, expected in zip(team_response, expected_response):462 self.assertDictContainsEntries(entry, expected)463 # A random user cannot get the library nor its team:464 with self.as_user(random_user):465 self._get_library(lib_id, expect_response=403)466 self._get_library_team(lib_id, expect_response=403)467 self._add_user_by_email(lib_id, never_added.email, access_level="read", expect_response=403)468 # But every authorized user can:469 for user in [admin, author, author_group_member]:470 with self.as_user(user):471 self._get_library(lib_id)472 data = self._get_library_team(lib_id)473 self.assertEqual(data, team_response)474 data = self._get_user_access_level(lib_id, reader.username)475 self.assertEqual(data, {**reader_grant, 'username': 'Reader', 'email': 'reader@example.com'})476 # A user with only read permission can get data about the library but not the team:477 with self.as_user(reader):478 self._get_library(lib_id)479 self._get_library_team(lib_id, expect_response=403)480 self._get_user_access_level(lib_id, author.username, expect_response=403)481 self._add_user_by_email(lib_id, never_added.email, access_level="read", expect_response=403)482 # Users without admin access cannot delete the library nor change its team:483 for user in [author, reader, author_group_member, random_user]:484 with self.as_user(user):485 self._delete_library(lib_id, expect_response=403)486 self._set_user_access_level(lib_id, author.username, access_level="admin", expect_response=403)487 self._set_user_access_level(lib_id, admin.username, access_level=None, expect_response=403)488 self._set_user_access_level(lib_id, random_user.username, access_level="read", expect_response=403)489 self._remove_user_access(lib_id, admin.username, expect_response=403)490 self._add_user_by_email(lib_id, never_added.email, access_level="read", expect_response=403)491 # Users with author access (or higher) can edit the library's properties:492 with self.as_user(author):493 self._update_library(lib_id, description="Revised description")494 with self.as_user(author_group_member):495 self._update_library(lib_id, title="New Library Title")496 # But other users cannot:497 with self.as_user(reader):498 self._update_library(lib_id, description="Prohibited description", expect_response=403)499 with self.as_user(random_user):500 self._update_library(lib_id, title="I can't set this title", expect_response=403)501 # Verify the permitted changes were made:502 with self.as_user(admin):503 data = self._get_library(lib_id)504 self.assertEqual(data["description"], "Revised description")505 self.assertEqual(data["title"], "New Library Title")506 # Library XBlock editing ###############################################507 # users with read permission or less cannot add blocks:508 for user in [reader, random_user]:509 with self.as_user(user):510 self._add_block_to_library(lib_id, "problem", "problem1", expect_response=403)511 # But authors and admins can:512 with self.as_user(admin):513 self._add_block_to_library(lib_id, "problem", "problem1")514 with self.as_user(author):515 self._add_block_to_library(lib_id, "problem", "problem2")516 with self.as_user(author_group_member):517 block3_data = self._add_block_to_library(lib_id, "problem", "problem3")518 block3_key = block3_data["id"]519 # At this point, the library contains 3 draft problem XBlocks.520 # A random user cannot read OLX nor assets (this library has allow_public_read False):521 with self.as_user(random_user):522 self._get_library_block_olx(block3_key, expect_response=403)523 self._get_library_block_assets(block3_key, expect_response=403)524 self._get_library_block_asset(block3_key, file_name="whatever.png", expect_response=403)525 # But if we grant allow_public_read, then they can:526 with self.as_user(admin):527 self._update_library(lib_id, allow_public_read=True)528 self._set_library_block_asset(block3_key, "whatever.png", b"data")529 with self.as_user(random_user):530 self._get_library_block_olx(block3_key)531 self._get_library_block_assets(block3_key)532 self._get_library_block_asset(block3_key, file_name="whatever.png")533 # Users without authoring permission cannot edit nor delete XBlocks (this library has allow_public_read False):534 for user in [reader, random_user]:535 with self.as_user(user):536 self._set_library_block_olx(block3_key, "<problem/>", expect_response=403)537 self._set_library_block_asset(block3_key, "test.txt", b"data", expect_response=403)538 self._delete_library_block(block3_key, expect_response=403)539 self._commit_library_changes(lib_id, expect_response=403)540 self._revert_library_changes(lib_id, expect_response=403)541 # But users with author permission can:542 with self.as_user(author_group_member):543 olx = self._get_library_block_olx(block3_key)544 self._set_library_block_olx(block3_key, olx)545 self._get_library_block_assets(block3_key)546 self._set_library_block_asset(block3_key, "test.txt", b"data")547 self._get_library_block_asset(block3_key, file_name="test.txt")548 self._delete_library_block(block3_key)549 self._commit_library_changes(lib_id)550 self._revert_library_changes(lib_id) # This is a no-op after the commit, but should still have 200 response551 def test_no_lockout(self):552 """553 Test that administrators cannot be removed if they are the only administrator granted access.554 """555 admin = UserFactory.create(username="Admin", email="admin@example.com")556 successor = UserFactory.create(username="Successor", email="successor@example.com")557 with self.as_user(admin):558 lib = self._create_library(slug="permtest", title="Permission Test Library", description="Testing")559 # Fail to downgrade permissions.560 self._remove_user_access(lib_key=lib['id'], username=admin.username, expect_response=400)561 # Promote another user.562 self._set_user_access_level(563 lib_key=lib['id'], username=successor.username, access_level="admin",564 )565 self._remove_user_access(lib_key=lib['id'], username=admin.username)566 def test_library_blocks_with_links(self):567 """568 Test that libraries can link to XBlocks in other content libraries569 """570 # Create a problem bank:571 bank_lib = self._create_library(slug="problem_bank", title="Problem Bank")572 bank_lib_id = bank_lib["id"]573 # Add problem1 to the problem bank:574 p1 = self._add_block_to_library(bank_lib_id, "problem", "problem1")575 self._set_library_block_olx(p1["id"], """576 <problem><multiplechoiceresponse>577 <p>What is an even number?</p>578 <choicegroup type="MultipleChoice">579 <choice correct="false">3</choice>580 <choice correct="true">2</choice>581 </choicegroup>582 </multiplechoiceresponse></problem>583 """)584 # Commit the changes, creating version 1:585 self._commit_library_changes(bank_lib_id)586 # Now update problem 1 and create a new problem 2:587 self._set_library_block_olx(p1["id"], """588 <problem><multiplechoiceresponse>589 <p>What is an odd number?</p>590 <choicegroup type="MultipleChoice">591 <choice correct="true">3</choice>592 <choice correct="false">2</choice>593 </choicegroup>594 </multiplechoiceresponse></problem>595 """)596 p2 = self._add_block_to_library(bank_lib_id, "problem", "problem2")597 self._set_library_block_olx(p2["id"], """598 <problem><multiplechoiceresponse>599 <p>What holds this XBlock?</p>600 <choicegroup type="MultipleChoice">601 <choice correct="false">A course</choice>602 <choice correct="true">A problem bank</choice>603 </choicegroup>604 </multiplechoiceresponse></problem>605 """)606 # Commit the changes, creating version 2:607 self._commit_library_changes(bank_lib_id)608 # At this point, bank_lib contains two problems and has two versions.609 # In version 1, problem1 is "What is an event number", and in version 2 it's "What is an odd number".610 # Problem2 exists only in version 2 and asks "What holds this XBlock?"611 lib = self._create_library(slug="links_test_lib", title="Link Test Library")612 lib_id = lib["id"]613 # Link to the problem bank:614 self._link_to_library(lib_id, "problem_bank", bank_lib_id)615 self._link_to_library(lib_id, "problem_bank_v1", bank_lib_id, version=1)616 # Add a 'unit' XBlock to the library:617 unit_block = self._add_block_to_library(lib_id, "unit", "unit1")618 self._set_library_block_olx(unit_block["id"], """619 <unit>620 <!-- version 2 link to "What is an odd number?" -->621 <xblock-include source="problem_bank" definition="problem/problem1"/>622 <!-- version 1 link to "What is an even number?" -->623 <xblock-include source="problem_bank_v1" definition="problem/problem1" usage="p1v1" />624 <!-- link to "What holds this XBlock?" -->625 <xblock-include source="problem_bank" definition="problem/problem2"/>626 </unit>627 """)628 # The unit can see and render its children:629 fragment = self._render_block_view(unit_block["id"], "student_view")630 self.assertIn("What is an odd number?", fragment["content"])631 self.assertIn("What is an even number?", fragment["content"])632 self.assertIn("What holds this XBlock?", fragment["content"])633 # Also check the API for retrieving links:634 links_created = self._get_library_links(lib_id)635 links_created.sort(key=lambda link: link["id"])636 self.assertEqual(len(links_created), 2)637 self.assertEqual(links_created[0]["id"], "problem_bank")638 self.assertEqual(links_created[0]["bundle_uuid"], bank_lib["bundle_uuid"])639 self.assertEqual(links_created[0]["version"], 2)640 self.assertEqual(links_created[0]["latest_version"], 2)641 self.assertEqual(links_created[0]["opaque_key"], bank_lib_id)642 self.assertEqual(links_created[1]["id"], "problem_bank_v1")643 self.assertEqual(links_created[1]["bundle_uuid"], bank_lib["bundle_uuid"])644 self.assertEqual(links_created[1]["version"], 1)645 self.assertEqual(links_created[1]["latest_version"], 2)646 self.assertEqual(links_created[1]["opaque_key"], bank_lib_id)647 def test_library_blocks_limit(self):648 """649 Test that libraries don't allow more than specified blocks650 """651 with self.settings(MAX_BLOCKS_PER_CONTENT_LIBRARY=1):652 lib = self._create_library(slug="test_lib_limits", title="Limits Test Library", description="Testing XBlocks limits in a library")653 lib_id = lib["id"]654 block_data = self._add_block_to_library(lib_id, "unit", "unit1")655 # Second block should throw error656 self._add_block_to_library(lib_id, "problem", "problem1", expect_response=400)657 # Also check that limit applies to child blocks too658 self._add_block_to_library(lib_id, "html", "html1", parent_block=block_data['id'], expect_response=400)659 @ddt.data(660 ('complex-types', COMPLEX, False),661 ('video-types', VIDEO, True),662 ('problem-types', PROBLEM, True),663 )664 @ddt.unpack665 def test_block_types(self, slug, library_type, constrained):666 """667 Test that the permitted block types listing for a library change based on type.668 """669 lib = self._create_library(slug=slug, title='Test Block Types', library_type=library_type)670 types = self._get_library_block_types(lib['id'])671 if constrained:672 self.assertEqual(len(types), 1)673 self.assertEqual(types[0]['block_type'], library_type)674 else:...

Full Screen

Full Screen

base.py

Source:base.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2"""3Tests for Blockstore-based Content Libraries4"""5from contextlib import contextmanager6from io import BytesIO7from urllib.parse import urlencode8import unittest9from django.conf import settings10from django.test.utils import override_settings11from mock import patch12from organizations.models import Organization13from rest_framework.test import APITestCase, APIClient14from search.search_engine_base import SearchEngine15from common.djangoapps.student.tests.factories import UserFactory16from openedx.core.djangoapps.content_libraries.libraries_index import MAX_SIZE17from openedx.core.djangoapps.content_libraries.constants import COMPLEX, ALL_RIGHTS_RESERVED18from openedx.core.djangolib.testing.utils import skip_unless_cms19from openedx.core.lib import blockstore_api20# Define the URLs here - don't use reverse() because we want to detect21# backwards-incompatible changes like changed URLs.22URL_PREFIX = '/api/libraries/v2/'23URL_LIB_CREATE = URL_PREFIX24URL_LIB_LIST = URL_PREFIX + '?{query_params}'25URL_LIB_DETAIL = URL_PREFIX + '{lib_key}/' # Get data about a library, update or delete library26URL_LIB_BLOCK_TYPES = URL_LIB_DETAIL + 'block_types/' # Get the list of XBlock types that can be added to this library27URL_LIB_LINKS = URL_LIB_DETAIL + 'links/' # Get the list of links in this library, or add a new one28URL_LIB_COMMIT = URL_LIB_DETAIL + 'commit/' # Commit (POST) or revert (DELETE) all pending changes to this library29URL_LIB_BLOCKS = URL_LIB_DETAIL + 'blocks/' # Get the list of XBlocks in this library, or add a new one30URL_LIB_TEAM = URL_LIB_DETAIL + 'team/' # Get the list of users/groups authorized to use this library31URL_LIB_TEAM_USER = URL_LIB_TEAM + 'user/{username}/' # Add/edit/remove a user's permission to use this library32URL_LIB_TEAM_GROUP = URL_LIB_TEAM + 'group/{group_name}/' # Add/edit/remove a group's permission to use this library33URL_LIB_BLOCK = URL_PREFIX + 'blocks/{block_key}/' # Get data about a block, or delete it34URL_LIB_BLOCK_OLX = URL_LIB_BLOCK + 'olx/' # Get or set the OLX of the specified XBlock35URL_LIB_BLOCK_ASSETS = URL_LIB_BLOCK + 'assets/' # List the static asset files of the specified XBlock36URL_LIB_BLOCK_ASSET_FILE = URL_LIB_BLOCK + 'assets/{file_name}' # Get, delete, or upload a specific static asset file37URL_BLOCK_RENDER_VIEW = '/api/xblock/v2/xblocks/{block_key}/view/{view_name}/'38URL_BLOCK_GET_HANDLER_URL = '/api/xblock/v2/xblocks/{block_key}/handler_url/{handler_name}/'39URL_BLOCK_METADATA_URL = '/api/xblock/v2/xblocks/{block_key}/'40# Decorator for tests that require blockstore41requires_blockstore = unittest.skipUnless(settings.RUN_BLOCKSTORE_TESTS, "Requires a running Blockstore server")42def elasticsearch_test(func):43 """44 Decorator for tests which connect to elasticsearch when needed45 """46 # This is disabled by default. Set to True if the elasticsearch engine is needed to test parts of code.47 if settings.ENABLE_ELASTICSEARCH_FOR_TESTS:48 func = override_settings(SEARCH_ENGINE="search.elastic.ElasticSearchEngine")(func)49 func = override_settings(ELASTIC_SEARCH_CONFIG=[{50 'use_ssl': settings.TEST_ELASTICSEARCH_USE_SSL,51 'host': settings.TEST_ELASTICSEARCH_HOST,52 'port': settings.TEST_ELASTICSEARCH_PORT,53 }])(func)54 func = patch("openedx.core.djangoapps.content_libraries.libraries_index.SearchIndexerBase.SEARCH_KWARGS", new={55 'refresh': 'wait_for'56 })(func)57 return func58 else:59 @classmethod60 def mock_perform(cls, filter_terms, text_search):61 # pylint: disable=no-member62 return SearchEngine.get_search_engine(cls.INDEX_NAME).search(63 doc_type=cls.DOCUMENT_TYPE,64 field_dictionary=filter_terms,65 query_string=text_search,66 size=MAX_SIZE67 )68 func = patch(69 "openedx.core.djangoapps.content_libraries.libraries_index.SearchIndexerBase.SEARCH_KWARGS",70 new={}71 )(func)72 func = patch(73 "openedx.core.djangoapps.content_libraries.libraries_index.SearchIndexerBase._perform_elastic_search",74 new=mock_perform75 )(func)76 return func77@requires_blockstore78@skip_unless_cms # Content Libraries REST API is only available in Studio79class ContentLibrariesRestApiTest(APITestCase):80 """81 Base class for Blockstore-based Content Libraries test that use the REST API82 These tests use the REST API, which in turn relies on the Python API.83 Some tests may use the python API directly if necessary to provide84 coverage of any code paths not accessible via the REST API.85 In general, these tests should86 (1) Use public APIs only - don't directly create data using other methods,87 which results in a less realistic test and ties the test suite too88 closely to specific implementation details.89 (Exception: users can be provisioned using a user factory)90 (2) Assert that fields are present in responses, but don't assert that the91 entire response has some specific shape. That way, things like adding92 new fields to an API response, which are backwards compatible, won't93 break any tests, but backwards-incompatible API changes will.94 WARNING: every test should have a unique library slug, because even though95 the django/mysql database gets reset for each test case, the lookup between96 library slug and bundle UUID does not because it's assumed to be immutable97 and cached forever.98 """99 @classmethod100 def setUpClass(cls):101 super().setUpClass()102 cls.user = UserFactory.create(username="Bob", email="bob@example.com", password="edx")103 # Create a collection using Blockstore API directly only because there104 # is not yet any Studio REST API for doing so:105 cls.collection = blockstore_api.create_collection("Content Library Test Collection")106 # Create an organization107 cls.organization, _ = Organization.objects.get_or_create(108 short_name="CL-TEST",109 defaults={"name": "Content Libraries Tachyon Exploration & Survey Team"},110 )111 def setUp(self):112 super().setUp()113 self.clients_by_user = {}114 self.client.login(username=self.user.username, password="edx")115 # Assertions116 def assertDictContainsEntries(self, big_dict, subset_dict):117 """118 Assert that the first dict contains at least all of the same entries as119 the second dict.120 Like python 2's assertDictContainsSubset, but with the arguments in the121 correct order.122 """123 self.assertGreaterEqual(big_dict.items(), subset_dict.items())124 # API helpers125 def _api(self, method, url, data, expect_response):126 """127 Call a REST API128 """129 response = getattr(self.client, method)(url, data, format="json")130 self.assertEqual(131 response.status_code, expect_response,132 "Unexpected response code {}:\n{}".format(response.status_code, getattr(response, 'data', '(no data)')),133 )134 return response.data135 @contextmanager136 def as_user(self, user):137 """138 Context manager to call the REST API as a user other than self.user139 """140 old_client = self.client141 if user not in self.clients_by_user:142 client = self.clients_by_user[user] = APIClient()143 client.force_authenticate(user=user)144 self.client = self.clients_by_user[user] # pylint: disable=attribute-defined-outside-init145 yield146 self.client = old_client # pylint: disable=attribute-defined-outside-init147 def _create_library(148 self, slug, title, description="", org=None, library_type=COMPLEX,149 license_type=ALL_RIGHTS_RESERVED, expect_response=200,150 ):151 """ Create a library """152 if org is None:153 org = self.organization.short_name154 return self._api('post', URL_LIB_CREATE, {155 "org": org,156 "slug": slug,157 "title": title,158 "description": description,159 "type": library_type,160 "license": license_type,161 "collection_uuid": str(self.collection.uuid),162 }, expect_response)163 def _list_libraries(self, query_params_dict=None, expect_response=200):164 """ List libraries """165 if query_params_dict is None:166 query_params_dict = {}167 return self._api('get', URL_LIB_LIST.format(query_params=urlencode(query_params_dict)), None, expect_response)168 def _get_library(self, lib_key, expect_response=200):169 """ Get a library """170 return self._api('get', URL_LIB_DETAIL.format(lib_key=lib_key), None, expect_response)171 def _update_library(self, lib_key, expect_response=200, **data):172 """ Update an existing library """173 return self._api('patch', URL_LIB_DETAIL.format(lib_key=lib_key), data, expect_response)174 def _delete_library(self, lib_key, expect_response=200):175 """ Delete an existing library """176 return self._api('delete', URL_LIB_DETAIL.format(lib_key=lib_key), None, expect_response)177 def _get_library_links(self, lib_key):178 """ Get the links of the specified content library """179 return self._api('get', URL_LIB_LINKS.format(lib_key=lib_key), None, expect_response=200)180 def _link_to_library(self, lib_key, link_id, other_library_key, version=None):181 """182 Modify the library identified by lib_key to create a named link to183 other_library_key. This allows you to use XBlocks from other_library in184 lib. Optionally specify a version to link to.185 """186 data = {187 "id": link_id,188 "opaque_key": other_library_key,189 "version": version,190 }191 return self._api('post', URL_LIB_LINKS.format(lib_key=lib_key), data, expect_response=200)192 def _commit_library_changes(self, lib_key, expect_response=200):193 """ Commit changes to an existing library """194 return self._api('post', URL_LIB_COMMIT.format(lib_key=lib_key), None, expect_response)195 def _revert_library_changes(self, lib_key, expect_response=200):196 """ Revert pending changes to an existing library """197 return self._api('delete', URL_LIB_COMMIT.format(lib_key=lib_key), None, expect_response)198 def _get_library_team(self, lib_key, expect_response=200):199 """ Get the list of users/groups authorized to use this library """200 return self._api('get', URL_LIB_TEAM.format(lib_key=lib_key), None, expect_response)201 def _get_user_access_level(self, lib_key, username, expect_response=200):202 """ Fetch a user's access level """203 url = URL_LIB_TEAM_USER.format(lib_key=lib_key, username=username)204 return self._api('get', url, None, expect_response)205 def _add_user_by_email(self, lib_key, email, access_level, expect_response=200):206 """ Add a user of a specified permission level by their email address. """207 url = URL_LIB_TEAM.format(lib_key=lib_key)208 return self._api('post', url, {"access_level": access_level, "email": email}, expect_response)209 def _set_user_access_level(self, lib_key, username, access_level, expect_response=200):210 """ Change the specified user's access level """211 url = URL_LIB_TEAM_USER.format(lib_key=lib_key, username=username)212 return self._api('put', url, {"access_level": access_level}, expect_response)213 def _remove_user_access(self, lib_key, username, expect_response=200):214 """ Should effectively be the same as the above with access_level=None, but using the delete HTTP verb. """215 url = URL_LIB_TEAM_USER.format(lib_key=lib_key, username=username)216 return self._api('delete', url, None, expect_response)217 def _set_group_access_level(self, lib_key, group_name, access_level, expect_response=200):218 """ Change the specified group's access level """219 url = URL_LIB_TEAM_GROUP.format(lib_key=lib_key, group_name=group_name)220 if access_level is None:221 return self._api('delete', url, None, expect_response)222 else:223 return self._api('put', url, {"access_level": access_level}, expect_response)224 def _get_library_block_types(self, lib_key, expect_response=200):225 """ Get the list of permitted XBlocks for this library """226 return self._api('get', URL_LIB_BLOCK_TYPES.format(lib_key=lib_key), None, expect_response)227 def _get_library_blocks(self, lib_key, query_params_dict=None, expect_response=200):228 """ Get the list of XBlocks in the library """229 if query_params_dict is None:230 query_params_dict = {}231 return self._api(232 'get',233 URL_LIB_BLOCKS.format(lib_key=lib_key) + '?' + urlencode(query_params_dict, doseq=True),234 None,235 expect_response236 )237 def _add_block_to_library(self, lib_key, block_type, slug, parent_block=None, expect_response=200):238 """ Add a new XBlock to the library """239 data = {"block_type": block_type, "definition_id": slug}240 if parent_block:241 data["parent_block"] = parent_block242 return self._api('post', URL_LIB_BLOCKS.format(lib_key=lib_key), data, expect_response)243 def _get_library_block(self, block_key, expect_response=200):244 """ Get a specific block in the library """245 return self._api('get', URL_LIB_BLOCK.format(block_key=block_key), None, expect_response)246 def _delete_library_block(self, block_key, expect_response=200):247 """ Delete a specific block from the library """248 self._api('delete', URL_LIB_BLOCK.format(block_key=block_key), None, expect_response)249 def _get_library_block_olx(self, block_key, expect_response=200):250 """ Get the OLX of a specific block in the library """251 result = self._api('get', URL_LIB_BLOCK_OLX.format(block_key=block_key), None, expect_response)252 if expect_response == 200:253 return result["olx"]254 return result255 def _set_library_block_olx(self, block_key, new_olx, expect_response=200):256 """ Overwrite the OLX of a specific block in the library """257 return self._api('post', URL_LIB_BLOCK_OLX.format(block_key=block_key), {"olx": new_olx}, expect_response)258 def _get_library_block_assets(self, block_key, expect_response=200):259 """ List the static asset files belonging to the specified XBlock """260 url = URL_LIB_BLOCK_ASSETS.format(block_key=block_key)261 result = self._api('get', url, None, expect_response)262 return result["files"] if expect_response == 200 else result263 def _get_library_block_asset(self, block_key, file_name, expect_response=200):264 """265 Get metadata about one static asset file belonging to the specified266 XBlock.267 """268 url = URL_LIB_BLOCK_ASSET_FILE.format(block_key=block_key, file_name=file_name)269 return self._api('get', url, None, expect_response)270 def _set_library_block_asset(self, block_key, file_name, content, expect_response=200):271 """272 Set/replace a static asset file belonging to the specified XBlock.273 content should be a binary string.274 """275 assert isinstance(content, bytes)276 file_handle = BytesIO(content)277 url = URL_LIB_BLOCK_ASSET_FILE.format(block_key=block_key, file_name=file_name)278 response = self.client.put(url, data={"content": file_handle})279 self.assertEqual(280 response.status_code, expect_response,281 "Unexpected response code {}:\n{}".format(response.status_code, getattr(response, 'data', '(no data)')),282 )283 def _delete_library_block_asset(self, block_key, file_name, expect_response=200):284 """ Delete a static asset file. """285 url = URL_LIB_BLOCK_ASSET_FILE.format(block_key=block_key, file_name=file_name)286 return self._api('delete', url, None, expect_response)287 def _render_block_view(self, block_key, view_name, expect_response=200):288 """289 Render an XBlock's view in the active application's runtime.290 Note that this endpoint has different behavior in Studio (draft mode)291 vs. the LMS (published version only).292 """293 url = URL_BLOCK_RENDER_VIEW.format(block_key=block_key, view_name=view_name)294 return self._api('get', url, None, expect_response)295 def _get_block_handler_url(self, block_key, handler_name):296 """297 Get the URL to call a specific XBlock's handler.298 The URL itself encodes authentication information so can be called299 without session authentication or any other kind of authentication.300 """301 url = URL_BLOCK_GET_HANDLER_URL.format(block_key=block_key, handler_name=handler_name)...

Full Screen

Full Screen

api_test_base.py

Source:api_test_base.py Github

copy

Full Screen

1#*-coding:utf-8-*-2import subprocess3import test_config4from unittest import TestCase5from tablestore import * 6from tablestore.error import * 7from tablestore.retry import *8import types9import math10import time11import restriction12import traceback13import commands14import sys 15import os 16import inspect17import logging18class APITestBase(TestCase):19 def __init__(self, methodName=None):20 TestCase.__init__(self, methodName=methodName)21 self.start_time = 022 self.logger = logging.getLogger('APITestBase') 23 self.logger.setLevel(logging.INFO) 24 25 fh = logging.FileHandler('tablestore_sdk_test.log') 26 fh.setLevel(logging.INFO) 27 28 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') 29 fh.setFormatter(formatter) 30 31 self.logger.addHandler(fh) 32 def setUp(self):33 self.client_test = OTSClient(34 test_config.OTS_ENDPOINT,35 test_config.OTS_ID,36 test_config.OTS_SECRET,37 test_config.OTS_INSTANCE,38 logger_name = 'APITestBase',39 retry_policy=DefaultRetryPolicy(),40 )41 42 time.sleep(1) # to avoid too frequent table operations43 for table_name in self.client_test.list_table():44 if table_name.find(self.get_python_version()) != -1:45 self.client_test.delete_table(table_name)46 def tearDown(self):47 pass48 def case_post_check(self):49 pass50 51 def assert_error(self, error, http_status, error_code, error_message):52 self.assert_equal(error.http_status, http_status)53 self.assert_equal(error.code, error_code)54 self.assert_equal(error.message.encode('utf-8'), error_message)55 def assert_false(self):56 self.logger.warn("\nAssertion Failed\n" + "".join(traceback.format_stack()))57 raise AssertionError58 def assert_equal(self, res, expect_res):59 if res != expect_res:60 #self.logger.warn("\nAssertion Failed\nactual: %s\nexpect: %s\n" % (res.decode('utf-8'), expect_res.decode('utf-8')) + "".join(traceback.format_stack()))61 self.assertEqual(res, expect_res)62 def try_exhaust_cu(self, func, count, read_cu, write_cu):63 i = 064 while True:65 try:66 self._try_exhaust_cu(func, count, read_cu, write_cu)67 break68 except Exception as e:69 i += 170 self.logger.info("_try_exhaust_cu failed: %s" % str(e))71 if i >= 10:72 self.assert_false()73 def _try_exhaust_cu(self, func, count, read_cu, write_cu):74 start_time = time.time()75 read_cu_sum = 076 write_cu_sum = 077 max_elapsed_time = 1.0 / (read_cu if read_cu != 0 else write_cu) * count78 self.logger.info("StartTime: %s, Count: %s, ReadCU: %s, WriteCU: %s, MaxElapsedTime: %s" % (start_time, count, read_cu, write_cu, max_elapsed_time));79 while count != 0:80 try:81 rc, wc = func()82 read_cu_sum += rc83 write_cu_sum += wc84 count -= 185 self.logger.info("ReadCU: %s, WriteCU: %s, ReadCUSum: %s, WriteCUSum: %s, Count: %s" % (rc, wc, read_cu_sum, write_cu_sum, count))86 except OTSServiceError as e:87 self.assert_error(e, 403, "OTSNotEnoughCapacityUnit", "Remaining capacity unit is not enough.")88 89 end_time = time.time()90 interval = end_time - start_time91 if interval >= max_elapsed_time * 1.2:92 raise Exception('Exceed max elapsed_time: %s, %s' % (interval, max_elapsed_time)) 93 avg_read_cu = read_cu_sum / interval94 avg_write_cu = write_cu_sum / interval95 self.logger.info("Interval: %s, AvgReadCU: %s, AvgWriteCU: %s, ReadCU: %s, WriteCU: %s" % (interval, avg_read_cu, avg_write_cu, read_cu, write_cu))96 if read_cu != 0:97 self.assertTrue(avg_read_cu >= read_cu * 0.8)98 self.assertTrue(avg_read_cu < read_cu * 1.2)99 if write_cu != 0:100 self.assertTrue(avg_write_cu >= write_cu * 0.8)101 self.assertTrue(avg_write_cu < write_cu * 1.2)102 103 def try_to_consuming(self, table_name, pk_dict_exist, pk_dict_not_exist,104 capacity_unit): 105 read = capacity_unit.read 106 write = capacity_unit.write 107 no_check_flag = 0108 if read > 1 or write > 1:109 read = read - 1110 write = write - 1111 no_check_flag = 1112 columns = []113 column_value_size = 4096114 all_pk_length = self.get_row_size(pk_dict_exist, [])115 #write116 for i in range(write):117 if i is not 0:118 columns.append(('X' * i, 'X' * (column_value_size - i)))119 else:120 columns.append(('col0', 'X' * (column_value_size - all_pk_length - 10)))121 if write is not 0:122 row = Row(pk_dict_exist, {'put':columns})123 consumed_update,return_row = self.client_test.update_row(table_name, row, Condition(RowExistenceExpectation.IGNORE))124 expect_consumed = CapacityUnit(0, self.sum_CU_from_row(pk_dict_exist, columns))125 self.assert_consumed(consumed_update, expect_consumed)126 self.assert_equal(write, self.sum_CU_from_row(pk_dict_exist, columns))127 #consume(0, 1)128 if 1 == no_check_flag: 129 try:130 consumed_update,return_row = self.client_test.delete_row(table_name, Row(pk_dict_not_exist), Condition(RowExistenceExpectation.IGNORE))131 except OTSServiceError as e:132 self.assert_false()133 134 #read135 while read >= write and write != 0:136 read = read - write137 consumed_read, return_row, token = self.client_test.get_row(table_name, pk_dict_exist, max_version = 1)138 expect_consumed = CapacityUnit(self.sum_CU_from_row(pk_dict_exist, columns), 0)139 self.assert_consumed(consumed_read, expect_consumed)140 self.assert_equal(return_row.primary_key, pk_dict_exist)141 for i in range(read + no_check_flag):142 consumed_read, return_row, token= self.client_test.get_row(table_name, pk_dict_not_exist, max_version = 1)143 self.assert_consumed(consumed_read, CapacityUnit(1, 0))144 self.assert_equal(return_row, None)145 def check_CU_by_consuming(self, table_name, pk_dict_exist, pk_dict_not_exist, 146 capacity_unit): 147 begin_time = time.time()148 self.try_to_consuming(table_name, pk_dict_exist, pk_dict_not_exist, capacity_unit)149 #只在CU较小进行强验证150 if capacity_unit.write <= 1 and capacity_unit.read <= 1:151 #consume(0, 1)152 try:153 consumed_update,pk,attr = self.client_test.delete_row(table_name, Condition(RowExistenceExpectation.IGNORE), pk_dict_not_exist)154 end_time = time.time()155 if end_time - begin_time < 1:156 self.assert_false()157 except OTSServiceError as e:158 self.assert_error(e, 403, "OTSNotEnoughCapacityUnit", "Remaining capacity unit for write is not enough.")159 #consume(1, 0)160 try:161 consumed_read, pk, attr, token = self.client_test.get_row(table_name, pk_dict_not_exist, max_version = 1)162 end_time = time.time()163 if end_time - begin_time < 1:164 self.assert_false()165 except OTSServiceError as e:166 self.assert_error(e, 403, "OTSNotEnoughCapacityUnit", "Remaining capacity unit for read is not enough.")167 def assert_consumed(self, consumed, expect_consumed):168 if consumed == None or expect_consumed == None:169 self.assert_equal(consumed, expect_consumed)170 else:171 self.assert_equal(consumed.read, expect_consumed.read)172 self.assert_equal(consumed.write, expect_consumed.write)173 def assert_columns(self, columns, expect_columns):174 if columns is None:175 columns = []176 if expect_columns is None:177 expect_columns = []178 self.assert_equal(len(columns), len(expect_columns))179 for index in range(len(columns)):180 self.assert_equal(columns[index][0], expect_columns[index][0])181 self.assert_equal(columns[index][1], expect_columns[index][1])182 183 def assert_RowDataItem_equal(self, response, expect_response):184 self.assert_equal(len(response), len(expect_response))185 for i in range(len(response)):186 self.assert_equal(len(response[i]), len(expect_response[i]))187 for j in range(len(response[i])):188 189 if expect_response[i][j].is_ok and not response[i][j].is_ok:190 raise Exception("BatchGetRow failed on at least one row, ErrorCode: %s ErrorMessage: %s" % (response[i][j].error_code, response[i][j].error_message))191 self.assert_equal(response[i][j].is_ok, expect_response[i][j].is_ok)192 if expect_response[i][j].is_ok:193 self.assert_consumed(response[i][j].consumed, expect_response[i][j].consumed)194 self.assert_equal(response[i][j].primary_key_columns, expect_response[i][j].primary_key_columns)195 self.assert_equal(response[i][j].attribute_columns, expect_response[i][j].attribute_columns)196 else:197 self.assert_equal(response[i][j].error_code, expect_response[i][j].error_code)198 self.assert_equal(response[i][j].error_message, expect_response[i][j].error_message)199 self.assert_consumed(response[i][j].consumed, expect_response[i][j].consumed)200 def assert_BatchWriteRowResponseItem(self, response, expect_response):201 202 self.assert_equal(len(response), len(expect_response))203 item_list = ['put', 'update', 'delete']204 for i in range(len(response)):205 for item in item_list:206 if response[i].has_key(item) == expect_response[i].has_key(item):207 if response[i].has_key(item):208 self.assert_equal(len(response[i][item]), len(expect_response[i][item]))209 for j in range(len(response[i][item])):210 if expect_response[i][item][j].is_ok and not response[i][item][j].is_ok:211 raise Exception("BatchWriteRow failed on at least one row, ErrorCode: %s ErrorMessage: %s" % (response[i][item][j].error_code, response[i][item][j].error_message))212 self.assert_equal(response[i][item][j].is_ok, expect_response[i][item][j].is_ok)213 if expect_response[i][item][j].is_ok:214 self.assert_consumed(response[i][item][j].consumed, expect_response[i][item][j].consumed)215 else:216 self.assert_equal(response[i][item][j].error_code, expect_response[i][item][j].error_code)217 self.assert_equal(response[i][item][j].error_message, expect_response[i][item][j].error_message)218 self.assert_consumed(response[i][item][j].consumed, expect_response[i][item][j].consumed)219 else:220 self.assert_false()221 def assert_time(self, res_time, expect_time, delta=60):222 if res_time == None or expect_time == None:223 self.assert_equal(res_time, expect_time)224 else:225 self.assert_equal((max(res_time, expect_time) - min(res_time, expect_time) <= delta), True)226 def assert_ReservedThroughputDetails(self, details, expect_details):227 self.assert_consumed(details.capacity_unit, expect_details.capacity_unit)228 self.assert_time(details.last_increase_time, expect_details.last_increase_time)229 self.assert_time(details.last_decrease_time, expect_details.last_decrease_time)230 self.assert_equal(details.number_of_decreases_today, expect_details.number_of_decreases_today)231 def assert_UpdateTableResponse(self, response, expect_response):232 self.assert_ReservedThroughputDetails(response.reserved_throughput_details, expect_response.reserved_throughput_details)233 def assert_TableMeta(self, response, expect_response):234 self.assert_equal(response.table_name, expect_response.table_name)235 self.assert_equal(response.schema_of_primary_key, expect_response.schema_of_primary_key)236 def assert_TableOptions(self, response, expect_response):237 self.assert_equal(response.time_to_live, expect_response.time_to_live)238 self.assert_equal(response.max_version, expect_response.max_version)239 self.assert_equal(response.max_time_deviation, expect_response.max_time_deviation)240 def assert_DescribeTableResponse(self, response, expect_capacity_unit, expect_table_meta, expect_options):241 self.assert_consumed(response.reserved_throughput_details.capacity_unit, expect_capacity_unit)242 self.assert_TableMeta(response.table_meta, expect_table_meta)243 self.assert_TableOptions(response.table_options, expect_options)244 def wait_for_capacity_unit_update(self, table_name):245 time.sleep(5)246 def wait_for_partition_load(self, table_name, instance_name=""):247 time.sleep(5)248 def get_primary_keys(self, pk_cnt, pk_type, pk_name="PK", pk_value="x"):249 pk_schema = []250 pk = []251 for i in range(pk_cnt):252 pk_schema.append(("%s%d" % (pk_name, i), pk_type))253 pk.append(("%s%d" % (pk_name, i), pk_value))254 return pk_schema, pk255 def get_row_size(self, pk_dict, column_dict):256 sum = 0257 for v in pk_dict:258 sum += len(v[0])259 if isinstance(v[1], bool):260 sum += 1261 elif isinstance(v[1], (int, long)):262 sum += 8263 elif isinstance(v[1], (types.StringType, bytearray, unicode)):264 sum += len(v[1])265 else:266 raise Exception("wrong type is set in primary value")267 for v in column_dict:268 sum += len(v[0])269 if len(v) == 1:270 pass271 elif isinstance(v[1], bool):272 sum += 1273 elif isinstance(v[1], (int, long, float)):274 sum += 8275 elif isinstance(v[1], (types.StringType, bytearray, unicode)):276 sum += len(v[1])277 else:278 raise Exception("wrong type is set in column value") 279 #sum += 4280 return sum281 def sum_CU_from_row(self, pk_dict, column_dict):282 sum = self.get_row_size(pk_dict, column_dict) 283 return int(math.ceil(sum * 1.0 / 4096))284 285 def _create_table_with_4_pk(self, table_name):286 table_meta = TableMeta(table_name, [('PK0', 'STRING'), ('PK1', 'STRING'), 287 ('PK2', 'STRING'), ('PK3', 'STRING')]) 288 table_options = TableOptions()289 reserved_throughput = ReservedThroughput(CapacityUnit(290 restriction.MaxReadWriteCapacityUnit, 291 restriction.MaxReadWriteCapacityUnit292 ))293 self.client_test.create_table(table_meta, table_options, reserved_throughput)294 self.wait_for_partition_load(table_name)295 def _create_maxsize_row(self, pk_value = 'V'):296 """创建table+生成size恰好为max的pk dict和column dict"""297 pks = []298 for i in range(0, 4):299 pk = 'PK' + str(i)300 value = pk_value * (restriction.MaxPKStringValueLength - len(pk))301 pks.append((pk, value))302 column_size = restriction.MaxPKStringValueLength * 4303 column_n = restriction.MaxColumnDataSizeForRow / column_size - 1304 columns = []305 for i in range(0, column_n):306 key = 'C' + str(i)307 value = 'V' * (column_size - len(key))308 columns.append((key, value))309 return pks, columns310 def get_python_version(self):311 if isinstance(sys.version_info, tuple):312 python_version = '%s_%s_%s' % (sys.version_info[0], sys.version_info[1], sys.version_info[2])313 else:314 python_version = '%s_%s_%s' % (sys.version_info.major, sys.version_info.minor, sys.version_info.micro)315 return python_version...

Full Screen

Full Screen

tests.py

Source:tests.py Github

copy

Full Screen

...77 expect(getattr(z, k), v)78def expect_slider(z, i, expected):79 value = get_value(z, i)80 expect(value, expected)81def expect_response(r, t, **values):82 expect(r['type'], t)83 for k, v in values.items():84 expect(k, 'in', r)85 expect(r[k], v)86def expect_response_progress(r, **values):87 expect('progress', 'in', r)88 p = r['progress']89 for k, v in values.items():90 expect(k, 'in', p)91 expect(p[k], v)92# test case dispatching93def call_live_method(method, group, case, **kwargs): # noqa94 print(f"Testing case '{case}'")95 try:96 test = globals()[f"live_test_{case}"]97 except KeyError:98 raise NotImplementedError("Test case not implemented", case)99 test(method, group.get_players()[0], group.session.params)100# test cases101def live_test_normal(method, player, conf):102 num_sliders = conf['num_sliders']103 retry_delay = conf['retry_delay']104 send(method, player, 'load')105 send(method, player, 'new')106 puzzle = get_last_puzzle(player)107 for i in range(num_sliders):108 last = (i == num_sliders-1)109 target = get_target(puzzle, i)110 # 1st attempt - incorrect111 value = target + SLIDER_SNAP * 2112 resp = send(method, player, 'value', slider=i, value=value)113 expect_puzzle(puzzle, iteration=1, num_correct=i, is_solved=False)114 expect_slider(puzzle, i, value)115 expect_response(resp, 'feedback', slider=i, value=value, is_correct=False, is_completed=False)116 time.sleep(retry_delay)117 # 2nd attempt - correct118 value = target119 resp = send(method, player, 'value', slider=i, value=value)120 expect_puzzle(puzzle, iteration=1, num_correct=i+1, is_solved=last)121 expect_slider(puzzle, i, value)122 expect_response(resp, 'feedback', slider=i, value=value, is_correct=True, is_completed=last)123 time.sleep(retry_delay)124def live_test_normal_timeout(method, player, conf):125 send(method, player, 'load')126 send(method, player, 'new')127 send(method, player, 'value', slider=0, value=100)128def live_test_dropout_timeout(method, player, conf):129 send(method, player, 'load')130 send(method, player, 'new')131def live_test_snapping(method, player, conf):132 send(method, player, 'load')133 send(method, player, 'new')134 puzzle = get_last_puzzle(player)135 solution0 = get_target(puzzle, 0)136 value = solution0 + 100137 snapped = snap_value(value, solution0)138 send(method, player, 'value', slider=0, value=value)139 expect_slider(puzzle, 0, snapped)140def live_test_reloading(method, player, conf):141 # start of the game142 resp = send(method, player, 'load')143 expect(get_last_puzzle(player), None)144 expect_response(resp, 'status')145 expect_response_progress(resp, iteration=0)146 resp = send(method, player, 'new')147 puzzle = get_last_puzzle(player)148 expect_puzzle(puzzle, iteration=1, num_correct=0)149 expect_response(resp, 'puzzle')150 expect_response_progress(resp, iteration=1)151 # 1 answer152 target = get_target(puzzle, 0)153 send(method, player, 'value', slider=0, value=target)154 expect_puzzle(puzzle, iteration=1, num_correct=1)155 expect_slider(puzzle, 0, target)156 # midgame reload157 resp = send(method, player, 'load')158 expect_response(resp, 'status')159 expect_response_progress(resp, iteration=1)160 puzzle = get_last_puzzle(player)161 expect_puzzle(puzzle, iteration=1, num_correct=1)162 expect_slider(puzzle, 0, target)163def live_test_submitting_null(method, player, conf):164 send(method, player, 'load')165 send(method, player, 'new')166 with expect_failure(TypeError):167 method(player.id_in_group, None)168 expect_puzzle(get_last_puzzle(player), iteration=1, num_correct=0, is_solved=False)169def live_test_submitting_empty(method, player, conf):170 send(method, player, 'load')171 send(method, player, 'new')172 with expect_failure(KeyError):173 method(player.id_in_group, {})174 expect_puzzle(get_last_puzzle(player), iteration=1, num_correct=0, is_solved=False)175def live_test_submitting_none(method, player, conf):176 send(method, player, 'load')177 send(method, player, 'new')178 with expect_failure(KeyError):179 send(method, player, 'value')180 expect_puzzle(get_last_puzzle(player), iteration=1, num_correct=0, is_solved=False)181def live_test_submitting_blank(method, player, conf):182 send(method, player, 'load')183 send(method, player, 'new')184 with expect_failure(ValueError):185 send(method, player, 'value', slider=0, value="")186 expect_puzzle(get_last_puzzle(player), iteration=1, num_correct=0, is_solved=False)187def live_test_submitting_premature(method, player, conf):188 send(method, player, 'load')189 with expect_failure(RuntimeError):190 send(method, player, 'value', slider=0, value=100)191def live_test_submitting_toomany(method, player, conf):192 retry_delay = conf['retry_delay']193 send(method, player, 'load')194 send(method, player, 'new')195 puzzle = get_last_puzzle(player)196 target = get_target(puzzle, 0)197 v1 = snap_value(100, target)198 v2 = snap_value(50, target)199 for _ in range(conf['attempts_per_slider']):200 resp = send(method, player, 'value', slider=0, value=v1)201 expect_response(resp, 'feedback')202 expect_slider(puzzle, 0, v1)203 time.sleep(retry_delay)204 with expect_failure(RuntimeError):205 send(method, player, 'value', slider=0, value=v2)206 expect_slider(puzzle, 0, v1)207def live_test_submitting_toofast(method, player, conf):208 send(method, player, 'load')209 send(method, player, 'new')210 puzzle = get_last_puzzle(player)211 target = get_target(puzzle, 0)212 v1 = snap_value(100, target)213 v2 = snap_value(50, target)214 resp = send(method, player, 'value', slider=0, value=v1)215 expect_response(resp, 'feedback')216 expect_slider(puzzle, 0, v1)217 with expect_failure(RuntimeError):218 send(method, player, 'value', slider=0, value=v2)219 expect_slider(puzzle, 0, v1)220def live_test_skipping(method, player, conf):221 send(method, player, 'load')222 send(method, player, 'new')223 with expect_failure(RuntimeError):224 send(method, player, 'new')225 expect_puzzle(get_last_puzzle(player), iteration=1, num_correct=0, is_solved=False)226def live_test_cheat_debug(method, player, conf):227 settings.DEBUG = True228 send(method, player, 'load')229 send(method, player, 'new')230 resp = send(method, player, 'cheat')231 expect_response(resp, 'solution')232def live_test_cheat_nodebug(method, player, conf):233 settings.DEBUG = False234 send(method, player, 'load')235 send(method, player, 'new')236 with expect_failure(RuntimeError):...

Full Screen

Full Screen

Playwright tutorial

LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.

Chapters:

  1. What is Playwright : Playwright is comparatively new but has gained good popularity. Get to know some history of the Playwright with some interesting facts connected with it.
  2. How To Install Playwright : Learn in detail about what basic configuration and dependencies are required for installing Playwright and run a test. Get a step-by-step direction for installing the Playwright automation framework.
  3. Playwright Futuristic Features: Launched in 2020, Playwright gained huge popularity quickly because of some obliging features such as Playwright Test Generator and Inspector, Playwright Reporter, Playwright auto-waiting mechanism and etc. Read up on those features to master Playwright testing.
  4. What is Component Testing: Component testing in Playwright is a unique feature that allows a tester to test a single component of a web application without integrating them with other elements. Learn how to perform Component testing on the Playwright automation framework.
  5. Inputs And Buttons In Playwright: Every website has Input boxes and buttons; learn about testing inputs and buttons with different scenarios and examples.
  6. Functions and Selectors in Playwright: Learn how to launch the Chromium browser with Playwright. Also, gain a better understanding of some important functions like “BrowserContext,” which allows you to run multiple browser sessions, and “newPage” which interacts with a page.
  7. Handling Alerts and Dropdowns in Playwright : Playwright interact with different types of alerts and pop-ups, such as simple, confirmation, and prompt, and different types of dropdowns, such as single selector and multi-selector get your hands-on with handling alerts and dropdown in Playright testing.
  8. Playwright vs Puppeteer: Get to know about the difference between two testing frameworks and how they are different than one another, which browsers they support, and what features they provide.
  9. Run Playwright Tests on LambdaTest: Playwright testing with LambdaTest leverages test performance to the utmost. You can run multiple Playwright tests in Parallel with the LammbdaTest test cloud. Get a step-by-step guide to run your Playwright test on the LambdaTest platform.
  10. Playwright Python Tutorial: Playwright automation framework support all major languages such as Python, JavaScript, TypeScript, .NET and etc. However, there are various advantages to Python end-to-end testing with Playwright because of its versatile utility. Get the hang of Playwright python testing with this chapter.
  11. Playwright End To End Testing Tutorial: Get your hands on with Playwright end-to-end testing and learn to use some exciting features such as TraceViewer, Debugging, Networking, Component testing, Visual testing, and many more.
  12. Playwright Video Tutorial: Watch the video tutorials on Playwright testing from experts and get a consecutive in-depth explanation of Playwright automation testing.

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful