How to use skip_all method in Slash

Best Python code snippet using slash

test_queryaction.py

Source:test_queryaction.py Github

copy

Full Screen

1import pytest2from sqlalchemy import text3import app.exc as exc4from app.db import get_dbsess5from app.test import app_with_db_inited6from app.schema import TABLES7from app.model.queryaction import (CreateAction, UpdateAction, DeleteAction, ListAction, OneAction)8skip_all = True9@pytest.fixture10def create_some_row(app_with_db_inited, data):11 with app_with_db_inited.app_context():12 dbsess = get_dbsess()13 CreateAction(dbsess, data['create'].keys(), TABLES, data=data['create']).do()14 dbsess.commit()15 return dbsess16@pytest.fixture17def data():18 d = {19 'create': {20 'user': [21 dict(name='abc', password='apassword'),22 dict(name='bbc', password='bpassword'),23 dict(name='cbc', password='cpassword'),24 dict(name='dbc', password='dpassword'),25 dict(name='ebc', password='epassword'),26 dict(name='fbc', password='fpassword')27 ],28 'roll': [29 dict(name='aaaaaaaaa', title='title_a'),30 dict(name='bbbbbbbbb', title='title_b'),31 dict(name='ccccccccc', title='title_c'),32 dict(name='ddddddddd', title='title_d'),33 dict(name='eeeeeeeee', title='title_e'),34 dict(name='fffffffff', title='title_f'),35 dict(name='ggggggggg', title='title_g'),36 dict(name='hhhhhhhhh', title='title_h'),37 dict(name='iiiiiiiii', title='title_i'),38 ]39 },40 'update': {41 'dist_tables': ['roll'], #!!!仅允许具有一个元素的list/tuple/set 且 该元素必须是condition中引用的表42 'data': {'name': 'updated name', 'last_update_time': 9777777},43 'condition': [44 [('roll.id', 'lt', 12), ('roll.id', 'gt', '7'), ('roll.name', 'like', '%h%')],45 [('roll.name', 'like', '%a'), ('roll.id', 'lt', 6)]46 ]47 },48 'delete':{49 'dist_tables': ['roll'], #!!!仅允许具有一个元素的list/tuple/set 且 该元素必须是condition中引用的表50 'condition': [51 [('roll.id', 'lt', 5), ('roll.id', 'gt', '2')],52 ]53 },54 'delete_2':{55 'dist_tables': ['roll'], #!!!仅允许具有一个元素的list/tuple/set 且 该元素必须是condition中引用的表56 'condition': [57 [('roll.name', 'like', '%aa')],58 ]59 },60 'delete_3':{61 'dist_tables': ['roll'], #!!!仅允许具有一个元素的list/tuple/set 且 该元素必须是condition中引用的表62 'condition': [63 [('roll.id', 'in', 'a_@$@_b')],64 ]65 },66 'delete_4':{67 'dist_tables': ['roll'], #!!!仅允许具有一个元素的list/tuple/set 且 该元素必须是condition中引用的表68 'condition': [69 [('roll.id', 'between', '8_@$@_11')],70 ]71 },72 'list': {73 'dist_tables': ['roll'],74 'condition': [75 # [('roll.id', 'like', '%2')],76 [('roll.id', 'lt', '99'), ]77 ],78 'limit': '3,3',79 'fields': ['roll.id', 'roll.name'],80 'order_by': [('roll.id', 'desc'), ('roll.name', 'asc')] 81 },82 'error_data':{83 'create': {84 'table_not_exists': {85 'some_table': []86 },87 # fields error88 'fields_error': {89 'user': [{'a':1, 'b':2}]90 }91 },92 'update': {93 'table_not_exists': {94 'dist_tables': ['rollc'],95 'data': {'name': 'updated name', 'last_update_time': 9777777},96 'condition': [97 [('roll.id', 'lt', 12), ('roll.id', 'gt', '7'), ('roll.name', 'like', '%h%')],98 [('roll.name', 'like', '%a'), ('roll.id', 'lt', 6)]99 ]100 },101 # fields error102 'fields_error': {103 'dist_tables': ['roll'],104 'data': {'name': 'updated name', 'last_update_time': 9777777},105 'condition': [106 [('some_other_table_may_make_fileds_error.id', 'lt', 12), ('roll.id', 'gt', '7'), ('roll.name', 'like', '%h%')],107 [('roll.name', 'like', '%a'), ('roll.id', 'lt', 6)]108 ]109 },110 'fields_error_2': {111 'dist_tables': ['roll'],112 'data': {'name': 'updated name', 'last_update_time': 9777777},113 'condition': [114 [('roll.wrong_field_not_in_table', 'lt', 12), ('roll.id', 'gt', '7'), ('roll.name', 'like', '%h%')],115 [('roll.wrong_field_not_in_table_2', 'like', '%a'), ('roll.id', 'lt', 6)]116 ]117 },118 },119 }120 }121 yield d122@pytest.mark.skipif(skip_all, reason='just skip it')123def test_create(app_with_db_inited, data):124 """125 """126 d = data127 with app_with_db_inited.app_context():128 dbsess = get_dbsess()129 CreateAction(dbsess, d['create'].keys(), TABLES, data=d['create']).do()130 for t in d['create'].keys():131 rs = dbsess.query(TABLES[t])\132 .filter(text('name in (%s)' %133 ','.join(['"' + row['name'] + '"' for row in d['create'][t] ]))).all()134 assert len(rs) == len(d['create'][t])135@pytest.mark.skipif(skip_all, reason='just skip it')136def test_create_table_not_exists_error(app_with_db_inited, data):137 """ 138 """139 d = data['error_data']['create']['table_not_exists']140 with pytest.raises(exc.YcmsTableNotExistsError):141 with app_with_db_inited.app_context():142 dbsess = get_dbsess()143 CreateAction(dbsess, d.keys(), TABLES, data=d).do()144 for t in d.keys():145 rs = dbsess.query(TABLES[t])\146 .filter(text('name in (%s)' %147 ','.join(['"' + row['name'] + '"' for row in d[t] ]))).all()148 assert len(rs) == len(d[t])149@pytest.mark.skipif(skip_all, reason='just skip it')150def test_create_fields_error(app_with_db_inited, data):151 """ 仅 bulk=True 152 """153 d = data['error_data']['create']['fields_error']154 with pytest.raises(exc.YcmsDBFieldNotExistsError):155 with app_with_db_inited.app_context():156 dbsess = get_dbsess()157 CreateAction(dbsess, d.keys(), TABLES, bulk=True, data=d).do()158 for t in d.keys():159 rs = dbsess.query(TABLES[t]).all()160 print(rs)161@pytest.mark.skipif(skip_all, reason='just skip it')162def test_update(app_with_db_inited, data):163 """ 根据条件update164 """165 d = data['update']166 with app_with_db_inited.app_context():167 dbsess = get_dbsess()168 UpdateAction(dbsess, dist_tables=d['dist_tables'],table_map_dict=TABLES, 169 condition=d['condition'], data=d['data']).do()170@pytest.mark.skipif(skip_all, reason='just skip it')171def test_update_when_exec_update_sqltable_not_exists_error(app_with_db_inited, data):172 """ 173 """174 d = data['error_data']['update']['table_not_exists']175 with pytest.raises(exc.YcmsTableNotExistsError):176 with app_with_db_inited.app_context():177 dbsess = get_dbsess()178 UpdateAction(dbsess, dist_tables=d['dist_tables'],table_map_dict=TABLES, 179 condition=d['condition'], data=d['data']).do()180@pytest.mark.skipif(skip_all, reason='just skip it')181def test_update_use_other_table_name_on_field_prefix_raise_when_parse_condition_fields_not_exitsts_error(app_with_db_inited, data):182 """ 根据条件update183 """184 d = data['error_data']['update']['fields_error']185 with pytest.raises(exc.YcmsSqlConditionParseError):186 with app_with_db_inited.app_context():187 dbsess = get_dbsess()188 UpdateAction(dbsess, dist_tables=d['dist_tables'],table_map_dict=TABLES, 189 condition=d['condition'], data=d['data']).do()190@pytest.mark.skipif(skip_all, reason='just skip it')191def test_update_use_wrong_field_name_of_right_table_raise_when_parse_condition_fields_not_exitsts_error(app_with_db_inited, data):192 """ 根据条件update193 """194 d = data['error_data']['update']['fields_error_2']195 with pytest.raises(exc.YcmsSqlConditionParseError):196 with app_with_db_inited.app_context():197 dbsess = get_dbsess()198 UpdateAction(dbsess, dist_tables=d['dist_tables'],table_map_dict=TABLES, 199 condition=d['condition'], data=d['data']).do()200@pytest.mark.skipif(skip_all, reason='just skip it')201def test_update_no_condition_error(app_with_db_inited, data):202 """ 缺失condition 报异常 exc.YcmsDangerActionError 203 """204 d = data['update']205 with pytest.raises(exc.YcmsDangerActionError):206 with app_with_db_inited.app_context():207 dbsess = get_dbsess()208 UpdateAction(dbsess, dist_tables=d['dist_tables'],table_map_dict=TABLES, 209 condition=[], data=d['data']).do()210@pytest.mark.skipif(skip_all, reason='just skip it')211def test_delete(app_with_db_inited, data):212 """ 删除213 """214 d = data['delete']215 d_2 = data['delete_2']216 d_3 = data['delete_3']217 d_4 = data['delete_4']218 with app_with_db_inited.app_context():219 dbsess = get_dbsess()220 CreateAction(dbsess, data['create'].keys(), TABLES, data=data['create']).do()221 dbsess.commit()222 DeleteAction(dbsess, dist_tables=d['dist_tables'],table_map_dict=TABLES, 223 condition=d['condition']).do()224 dbsess.commit()225 ids = []226 for row in dbsess.query(TABLES[d['dist_tables'][0]]).with_entities(227 TABLES[d['dist_tables'][0]].id, TABLES[d['dist_tables'][0]].name228 ).all():229 ids.append(row[0])230 print(row)231 assert ids == [1, 2, 5, 6, 7, 8, 9]232 DeleteAction(dbsess, dist_tables=d_2['dist_tables'],table_map_dict=TABLES, 233 condition=d_2['condition']).do()234 dbsess.commit()235 ids = []236 for row in dbsess.query(TABLES[d['dist_tables'][0]]).with_entities(237 TABLES[d['dist_tables'][0]].id, TABLES[d['dist_tables'][0]].name238 ).all():239 ids.append(row[0])240 assert len(ids) == 6 and ids == [2, 5, 6, 7, 8, 9]241 DeleteAction(dbsess, dist_tables=d_3['dist_tables'],table_map_dict=TABLES, 242 condition=d_3['condition']).do()243 dbsess.commit()244 ids = []245 for row in dbsess.query(TABLES[d['dist_tables'][0]]).with_entities(246 TABLES[d['dist_tables'][0]].id, TABLES[d['dist_tables'][0]].name247 ).all():248 ids.append(row[0])249 assert len(ids) == 6 and ids == [2, 5, 6, 7, 8, 9]250 DeleteAction(dbsess, dist_tables=d_4['dist_tables'],table_map_dict=TABLES, 251 condition=d_4['condition']).do()252 dbsess.commit()253 ids = []254 for row in dbsess.query(TABLES[d['dist_tables'][0]]).with_entities(255 TABLES[d['dist_tables'][0]].id, TABLES[d['dist_tables'][0]].name256 ).all():257 ids.append(row[0])258 assert ids == [2, 5, 6, 7]259# @pytest.mark.skipif(skip_all, reason='just skip it')260def test_list(app_with_db_inited, data, create_some_row):261 with app_with_db_inited.app_context():262 d = data['list']263 dbsess = get_dbsess()264 rs = ListAction(dbsess, table_map_dict=TABLES, dist_tables=d['dist_tables'],265 condition=d['condition'], order_by=d['order_by'],266 fields=d['fields'], limit=d['limit']).do()267 print(rs)268# @pytest.mark.skipif(skip_all, reason='just skip it')269def test_one(app_with_db_inited, data, create_some_row):270 with app_with_db_inited.app_context():271 d = data['list']272 dbsess = get_dbsess()273 rs = OneAction(dbsess, table_map_dict=TABLES, dist_tables=d['dist_tables'],274 condition=d['condition'], fields=d['fields']).do()...

Full Screen

Full Screen

test_scraping.py

Source:test_scraping.py Github

copy

Full Screen

1"""2If run as script serve as fake web-server with materials to scrape3"""4from aiohttp import web5import mimetypes6import aiohttp_jinja27import jinja28from hypothesis import given9from hypothesis import strategies as st10from scraper.kinozal.kinozal_scraper import KinozalScraper11import urllib.parse12import pytest13import itertools14from aiohttp.test_utils import AioHTTPTestCase, unittest_run_loop, TestServer15from aiohttp.web import HTTPNotFound16import logging17import time18skip_all = False # True # to run specific test we just skip all other tests19logging.basicConfig(filename='test_scraping.log', level=logging.DEBUG)20TEST_SERVER_PORT=4434 # differs from production so we can run tests and production at the same machine simultaniously21async def list_handler(request):22 """23 Test web server.24 Returns 3 pages of search result (does not depends on the actual search string)25 :param request:26 :return:27 """28 params = request.rel_url.query29 page = int(params['page'])30 if page > 3:31 raise HTTPNotFound()32 response = aiohttp_jinja2.render_template(f'better_call_saul_{page}.html',33 request,34 {})35 response.headers['Content-Language'] = 'ru'36 response.headers['Content-Type'] = 'text/html; charset=utf-8'37 return response38async def details_handler(request):39 """40 Test web server.41 For any movie details page request returns one of the 5 predefined pages.42 """43 params = request.rel_url.query44 id = int(params['id'])45 card_num = id % 5 + 146 response = aiohttp_jinja2.render_template(f'better_call_saul_card_{card_num}.html',47 request,48 {})49 response.headers['Content-Language'] = 'ru'50 response.headers['Content-Type'] = 'text/html; charset=utf-8'51 return response52def aioapp():53 app = web.Application()54 # app.router.add_static('/', 'scraper/kinozal', name='static', )55 app.router.add_get('/browse.php', list_handler)56 app.router.add_get('/details.php', details_handler)57 mimetypes.init()58 mimetypes.types_map['.php'] = 'text/html; charset=windows-1251'59 aiohttp_jinja2.setup(app,60 loader=jinja2.FileSystemLoader('test'))61 return app62@pytest.fixture(params=['localhost', 'example.com', 'http://my.domain.com/'])63def host(request):64 return request.param65@pytest.fixture(params=['Better Call Saul', 'Лучше позвать Сола', 'h834687564325435%$^^%$$^%32sdgvD'])66def search_string(request):67 return request.param68@pytest.mark.skipif(skip_all, reason='Do not test this case')69@given(page=st.integers(min_value=1, max_value=1000))70def test_kinozal_url(host, search_string, page):71 s = KinozalScraper(host)72 page_url = s.search_list_url(search_string, page)73 parsed_url = urllib.parse.urlparse(page_url)74 assert parsed_url.scheme == '' or parsed_url.scheme.startswith('http')75 #assert parsed_url.netloc - could be empty or localhost, or example.com, no feasible way to test76 assert parsed_url.path == '/browse.php'77 assert 's=' in parsed_url.query78 assert 'q=0' in parsed_url.query79 assert f'page={page}' in parsed_url.query80 assert search_string in itertools.chain.from_iterable(urllib.parse.parse_qsl(parsed_url.query))81@pytest.mark.skipif(skip_all, reason='Do not test this case')82@given(id=st.integers(min_value=100000, max_value=10000000))83def test_details_url(host, id):84 s = KinozalScraper(host)85 page_url = s.details_url(f'details.php?id={id}')86 parsed_url = urllib.parse.urlparse(page_url)87 assert parsed_url.scheme == '' or parsed_url.scheme.startswith('http')88 #assert parsed_url.netloc - could be empty or localhost, or example.com, no feasible way to test89 assert parsed_url.path == '/details.php'90@pytest.mark.skipif(skip_all, reason='Do not test this case')91def test_extract_episode():92 scraper = KinozalScraper(f'localhost:{TEST_SERVER_PORT}', max_not_a_robot_delay=0)93 page_html = open('test/better_call_saul_1.html', 'r').read()94 movies_count = 095 for movie in scraper.extract_episode(page_html, 'fake_url'):96 movies_count += 197 if movies_count == 2:98 assert movie['title'] == 'Лучше звоните Солу (4 сезон: 1-10 серии из 10) / Better Call Saul / 2018 / ПД (Кубик в Кубе) / WEBRip (720p)'99 assert movie['details_link'] == '/details.php?id=1638000'100 assert movie['seeds_num'] == '15'101 assert movie['size'] == 15190000000102 assert movie['id'] == 'ktv_1638000'103 assert movie['last_season'] == 4104 assert movie['last_episode'] == 10105 assert movie['seasons'] == [4]106 assert movie['torrent_link'] == 'http://dl.kinozal.tv/download.php?id=1638000'107 assert movies_count == 50108@pytest.mark.skipif(skip_all, reason='Do not test this case')109def test_extract_details():110 scraper = KinozalScraper('localhost:4433', max_not_a_robot_delay=0)111 page_html = open('test/better_call_saul_card_1.html', 'r').read()112 movie = scraper.extract_details(page_html)113 assert movie['audio'] == 'Русский (AC3, 6 ch, 640 Кбит/с), английский (E-AC3, 6 ch, 640 Кбит/с)'114 assert movie['quality'] == 'WEBRip ( 2160p)'115 assert movie['subtitles'] == 'Русские, английские'116 assert movie['has_english_subtitles'] == True117 assert movie['has_english_audio'] == True118class TestScrapingAsync(AioHTTPTestCase):119 async def get_application(self):120 return aioapp()121 async def get_server(self, app):122 return TestServer(app, loop=self.loop, host='127.0.0.1', port=TEST_SERVER_PORT)123 """124 aiohttp test case is not pytest, but unittest. 125 So we have to replace pytest fixtures126 """127 search_string = 'Better Call Saul'128 @pytest.mark.skipif(skip_all, reason='Do not test this case')129 @unittest_run_loop130 async def test_i_am_not_a_robot_delay(self):131 scraper = KinozalScraper(f'localhost:{TEST_SERVER_PORT}')132 start = time.time()133 await scraper.i_am_not_a_robot_delay()134 assert time.time() - start > 0.009135 @pytest.mark.skipif(skip_all, reason='Do not test this case')136 @unittest_run_loop137 async def test_list_page(self):138 scraper = KinozalScraper(f'localhost:{TEST_SERVER_PORT}', max_not_a_robot_delay=0)139 page_count = 0140 async for _ in scraper.list_page(self.search_string):141 page_count += 1142 assert page_count == 3, 'Test web-server returns exactly three pages with search results'143 @pytest.mark.skipif(skip_all, reason='Do not test this case')144 @unittest_run_loop145 async def test_episodes(self):146 scraper = KinozalScraper(f'localhost:{TEST_SERVER_PORT}', max_not_a_robot_delay=0)147 movies_count = 0148 async for movie in scraper.episodes(self.search_string):149 movies_count += 1150 if movies_count == 130:151 assert movie['title'] == 'Лучше звоните Солу (1 сезон: 1-10 серии из 10) / Better Call Saul / 2015 / ЛО (Kerob) / WEB-DLRip'152 assert movie['details_link'] == '/details.php?id=1307895'153 assert movie['seeds_num'] == '0'154 assert movie['size'] == 4110000000155 assert movie['id'] == 'ktv_1307895'156 assert movie['last_season'] == 1157 assert movie['last_episode'] == 10158 assert movie['seasons'] == [1]159 if movies_count == 86:160 assert movie['title'] == 'Лучше звоните Солу (1-2 сезон) / Better Call Saul (Unofficial) / Soundtrack / 2015-2016 / MP3'161 assert movie['details_link'] == '/details.php?id=1453060'162 assert movie['seeds_num'] == '0'163 assert movie['size'] == 637000000164 assert movie['id'] == 'ktv_1453060'165 assert movies_count == 130166 @pytest.mark.skipif(skip_all, reason='Do not test this case')167 @unittest_run_loop168 async def test_details(self):169 scraper = KinozalScraper(f'localhost:{TEST_SERVER_PORT}', max_not_a_robot_delay=0)170 movie = await scraper.details('/details.php?id=1534654')171 assert movie['audio'] == 'AC3, 2 ch, 192 Кбит/с'172 assert movie['has_english_subtitles'] == False173 @pytest.mark.skipif(skip_all, reason='Do not test this case')174 @unittest_run_loop175 async def test_find_episodes(self):176 scraper = KinozalScraper(f'localhost:{TEST_SERVER_PORT}', max_not_a_robot_delay=0)177 movies_count = 0178 async for movie in scraper.find_episodes(self.search_string, season=3, min_episode=9):179 movies_count += 1180 assert movies_count == 36181def run_fake_web_server():...

Full Screen

Full Screen

processors.py

Source:processors.py Github

copy

Full Screen

...17 output = []18 while input_a.count() > 0:19 output.append(input_a.pop())20 for input_x in other_inputs:21 input_x.skip_all()22 return output23def copy_latest_a(input_a, *other_inputs, **kwargs):24 """Copy the latest reading from input a into the output.25 All other inputs are skipped so that after this function runs there are no26 readings left in any of the input walkers even if no output is generated.27 Returns:28 list(IOTileReading)29 """30 output = []31 last_reading = None32 if input_a.selector.inexhaustible:33 last_reading = input_a.pop()34 else:35 while input_a.count() > 0:36 last_reading = input_a.pop()37 if last_reading is not None:38 output = [last_reading]39 for input_x in other_inputs:40 input_x.skip_all()41 return output42def copy_count_a(input_a, *other_inputs, **kwargs):43 """Copy the latest reading from input a into the output.44 All other inputs are skipped to that after this function45 runs there are no readings left in any of the input walkers46 even if no output is generated.47 Returns:48 list(IOTileReading)49 """50 count = input_a.count()51 input_a.skip_all();52 for input_x in other_inputs:53 input_x.skip_all()54 return [IOTileReading(0, 0, count)]55def call_rpc(*inputs, **kwargs):56 """Call an RPC based on the encoded value read from input b.57 The response of the RPC must be a 4 byte value that is used as58 the output of this call. The encoded RPC must be a 32 bit value59 encoded as "BBH":60 B: ignored, should be 061 B: the address of the tile that we should call62 H: The id of the RPC to call63 All other readings are then skipped so that there are no64 readings in any input queue when this function returns65 Returns:66 list(IOTileReading)67 """68 rpc_executor = kwargs['rpc_executor']69 output = []70 try:71 value = inputs[1].pop()72 addr = value.value >> 1673 rpc_id = value.value & 0xFFFF74 reading_value = rpc_executor.rpc(addr, rpc_id)75 output.append(IOTileReading(0, 0, reading_value))76 except (HardwareError, StreamEmptyError):77 pass78 for input_x in inputs:79 input_x.skip_all()80 return output81def trigger_streamer(*inputs, **kwargs):82 """Trigger a streamer based on the index read from input b.83 Returns:84 list(IOTileReading)85 """86 streamer_marker = kwargs['mark_streamer']87 try:88 reading = inputs[1].pop()89 except StreamEmptyError:90 return []91 finally:92 for input_x in inputs:93 input_x.skip_all()94 try:95 streamer_marker(reading.value)96 except ArgumentError:97 return []98 return [IOTileReading(0, 0, 0)]99def subtract_afromb(*inputs, **kwargs):100 """Subtract stream a from stream b.101 Returns:102 list(IOTileReading)103 """104 try:105 value_a = inputs[0].pop()106 value_b = inputs[1].pop()107 return [IOTileReading(0, 0, value_b.value - value_a.value)]...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful