How to use redirected_to method in Playwright Python

Best Python code snippet using playwright-python

admin.py

Source:admin.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2#3# Copyright (C) 2007-2010 Edgewall Software4# All rights reserved.5#6# This software is licensed as described in the file COPYING, which7# you should have received as part of this distribution. The terms8# are also available at http://bitten.edgewall.org/wiki/License.9import shutil10import tempfile11import unittest12from trac.core import TracError13from trac.db import DatabaseManager14from trac.perm import PermissionCache, PermissionError, PermissionSystem15from trac.resource import Resource16from trac.test import EnvironmentStub, Mock17from trac.web.href import Href18from trac.web.main import RequestDone19from bitten.main import BuildSystem20from bitten.model import BuildConfig, TargetPlatform, schema21from bitten.admin import BuildMasterAdminPageProvider, \22 BuildConfigurationsAdminPageProvider23try:24 from trac.perm import DefaultPermissionPolicy25except ImportError:26 DefaultPermissionPolicy = None27class BuildMasterAdminPageProviderTestCase(unittest.TestCase):28 def setUp(self):29 self.env = EnvironmentStub(enable=['trac.*', 'bitten.*'])30 self.env.path = tempfile.mkdtemp()31 # Create tables32 db = self.env.get_db_cnx()33 cursor = db.cursor()34 connector, _ = DatabaseManager(self.env)._get_connector()35 for table in schema:36 for stmt in connector.to_sql(table):37 cursor.execute(stmt)38 # Set up permissions39 self.env.config.set('trac', 'permission_store',40 'DefaultPermissionStore')41 PermissionSystem(self.env).grant_permission('joe', 'BUILD_ADMIN')42 if DefaultPermissionPolicy is not None and hasattr(DefaultPermissionPolicy, "CACHE_EXPIRY"):43 self.old_perm_cache_expiry = DefaultPermissionPolicy.CACHE_EXPIRY44 DefaultPermissionPolicy.CACHE_EXPIRY = -145 # Hook up a dummy repository46 self.repos = Mock(47 get_node=lambda path, rev=None: Mock(get_history=lambda: [],48 isdir=True),49 normalize_path=lambda path: path,50 sync=lambda: None51 )52 self.env.get_repository = lambda authname=None: self.repos # 0.1153 try: # 0.12+54 from trac.core import Component, implements55 from trac.versioncontrol.api import IRepositoryConnector, \56 IRepositoryProvider57 class DummyRepos(Component):58 implements(IRepositoryConnector, IRepositoryProvider)59 def get_supported_types(self):60 yield ('dummy', 9)61 def get_repository(this, repos_type, repos_dir, params):62 return self.repos # Note: 'this' vs 'self' usage63 def get_repositories(self):64 yield ('', {'dir': 'dummy_dir', 'type': 'dummy'})65 self.dummy = DummyRepos66 except ImportError:67 self.dummy = None # not supported, will use get_repository()68 def tearDown(self):69 if self.dummy: # remove from components list + interfaces dict70 self.env.__metaclass__._components.remove(self.dummy)71 for key in self.env.__metaclass__._registry.keys():72 if self.dummy in self.env.__metaclass__._registry[key]:73 self.env.__metaclass__._registry[key].remove(self.dummy)74 if DefaultPermissionPolicy is not None and hasattr(DefaultPermissionPolicy, "CACHE_EXPIRY"):75 DefaultPermissionPolicy.CACHE_EXPIRY = self.old_perm_cache_expiry76 shutil.rmtree(self.env.path)77 def test_get_admin_panels(self):78 provider = BuildMasterAdminPageProvider(self.env)79 req = Mock(perm=PermissionCache(self.env, 'joe'))80 self.assertEqual([('bitten', 'Builds', 'master', 'Master Settings')],81 list(provider.get_admin_panels(req)))82 PermissionSystem(self.env).revoke_permission('joe', 'BUILD_ADMIN')83 req = Mock(perm=PermissionCache(self.env, 'joe'))84 self.assertEqual([], list(provider.get_admin_panels(req)))85 def test_process_get_request(self):86 req = Mock(method='GET', chrome={}, href=Href('/'),87 perm=PermissionCache(self.env, 'joe'))88 provider = BuildMasterAdminPageProvider(self.env)89 template_name, data = provider.render_admin_panel(90 req, 'bitten', 'master', ''91 )92 self.assertEqual('bitten_admin_master.html', template_name)93 assert 'master' in data94 master = data['master']95 self.assertEqual(3600, master.slave_timeout)96 self.assertEqual(0, master.stabilize_wait)97 assert not master.adjust_timestamps98 assert not master.build_all99 self.assertEqual('log/bitten', master.logs_dir)100 def test_process_config_changes(self):101 redirected_to = []102 def redirect(url):103 redirected_to.append(url)104 raise RequestDone105 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),106 abs_href=Href('http://example.org/'), redirect=redirect,107 args={'slave_timeout': '60', 'adjust_timestamps': ''})108 provider = BuildMasterAdminPageProvider(self.env)109 try:110 provider.render_admin_panel(req, 'bitten', 'master', '')111 self.fail('Expected RequestDone')112 except RequestDone:113 self.assertEqual('http://example.org/admin/bitten/master',114 redirected_to[0])115 section = self.env.config['bitten']116 self.assertEqual(60, section.getint('slave_timeout'))117 self.assertEqual(True, section.getbool('adjust_timestamps'))118 self.assertEqual(False, section.getbool('build_all'))119class BuildConfigurationsAdminPageProviderTestCase(unittest.TestCase):120 def setUp(self):121 self.env = EnvironmentStub(enable=['trac.*', 'bitten.*'])122 self.env.path = tempfile.mkdtemp()123 # Create tables124 db = self.env.get_db_cnx()125 cursor = db.cursor()126 connector, _ = DatabaseManager(self.env)._get_connector()127 for table in schema:128 for stmt in connector.to_sql(table):129 cursor.execute(stmt)130 # Set up permissions131 self.env.config.set('trac', 'permission_store',132 'DefaultPermissionStore')133 PermissionSystem(self.env).grant_permission('joe', 'BUILD_CREATE')134 PermissionSystem(self.env).grant_permission('joe', 'BUILD_DELETE')135 PermissionSystem(self.env).grant_permission('joe', 'BUILD_MODIFY')136 if DefaultPermissionPolicy is not None and hasattr(DefaultPermissionPolicy, "CACHE_EXPIRY"):137 self.old_perm_cache_expiry = DefaultPermissionPolicy.CACHE_EXPIRY138 DefaultPermissionPolicy.CACHE_EXPIRY = -1139 # Hook up a dummy repository140 self.repos = Mock(141 get_node=lambda path, rev=None: Mock(get_history=lambda: [],142 isdir=True),143 normalize_path=lambda path: path,144 sync=lambda: None,145 resource=Resource('repository', None)146 )147 self.env.get_repository = lambda authname=None: self.repos # 0.11148 try: # 0.12+149 from trac.core import Component, implements150 from trac.versioncontrol.api import IRepositoryConnector, \151 IRepositoryProvider152 class DummyRepos(Component):153 implements(IRepositoryConnector, IRepositoryProvider)154 def get_supported_types(self):155 yield ('dummy', 9)156 def get_repository(this, repos_type, repos_dir, params):157 return self.repos # Note: 'this' vs 'self' usage158 def get_repositories(self):159 yield ('', {'dir': 'dummy_dir', 'type': 'dummy'})160 self.dummy = DummyRepos161 except ImportError:162 self.dummy = None # not supported, will use get_repository()163 def tearDown(self):164 if self.dummy: # remove from components list + interfaces dict165 self.env.__metaclass__._components.remove(self.dummy)166 for key in self.env.__metaclass__._registry.keys():167 if self.dummy in self.env.__metaclass__._registry[key]:168 self.env.__metaclass__._registry[key].remove(self.dummy)169 if DefaultPermissionPolicy is not None and hasattr(DefaultPermissionPolicy, "CACHE_EXPIRY"):170 DefaultPermissionPolicy.CACHE_EXPIRY = self.old_perm_cache_expiry171 shutil.rmtree(self.env.path)172 def test_get_admin_panels(self):173 provider = BuildConfigurationsAdminPageProvider(self.env)174 req = Mock(perm=PermissionCache(self.env, 'joe'))175 self.assertEqual([('bitten', 'Builds', 'configs', 'Configurations')],176 list(provider.get_admin_panels(req)))177 PermissionSystem(self.env).revoke_permission('joe', 'BUILD_MODIFY')178 req = Mock(perm=PermissionCache(self.env, 'joe'))179 self.assertEqual([], list(provider.get_admin_panels(req)))180 def test_process_view_configs_empty(self):181 req = Mock(method='GET', chrome={}, href=Href('/'),182 perm=PermissionCache(self.env, 'joe'))183 provider = BuildConfigurationsAdminPageProvider(self.env)184 template_name, data = provider.render_admin_panel(185 req, 'bitten', 'configs', ''186 )187 self.assertEqual('bitten_admin_configs.html', template_name)188 self.assertEqual([], data['configs'])189 def test_process_view_configs(self):190 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',191 active=True).insert()192 BuildConfig(self.env, name='bar', label='Bar', path='branches/bar',193 min_rev='123', max_rev='456').insert()194 req = Mock(method='GET', chrome={}, href=Href('/'),195 perm=PermissionCache(self.env, 'joe'))196 provider = BuildConfigurationsAdminPageProvider(self.env)197 template_name, data = provider.render_admin_panel(198 req, 'bitten', 'configs', ''199 )200 self.assertEqual('bitten_admin_configs.html', template_name)201 assert 'configs' in data202 configs = data['configs']203 self.assertEqual(2, len(configs))204 self.assertEqual({205 'name': 'bar', 'href': '/admin/bitten/configs/bar',206 'label': 'Bar', 'min_rev': '123', 'max_rev': '456',207 'path': 'branches/bar', 'active': False, 'recipe': False208 }, configs[0])209 self.assertEqual({210 'name': 'foo', 'href': '/admin/bitten/configs/foo',211 'label': 'Foo', 'min_rev': None, 'max_rev': None,212 'path': 'branches/foo', 'active': True, 'recipe': False213 }, configs[1])214 def test_process_view_config(self):215 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',216 active=True).insert()217 TargetPlatform(self.env, config='foo', name='any').insert()218 req = Mock(method='GET', chrome={}, href=Href('/'),219 perm=PermissionCache(self.env, 'joe'))220 provider = BuildConfigurationsAdminPageProvider(self.env)221 template_name, data = provider.render_admin_panel(222 req, 'bitten', 'configs', 'foo'223 )224 self.assertEqual('bitten_admin_configs.html', template_name)225 assert 'config' in data226 config = data['config']227 self.assertEqual({228 'name': 'foo', 'label': 'Foo', 'description': '', 'recipe': '',229 'path': 'branches/foo', 'min_rev': None, 'max_rev': None,230 'active': True, 'platforms': [{231 'href': '/admin/bitten/configs/foo/1',232 'name': 'any', 'id': 1, 'rules': []233 }]234 }, config)235 def test_process_activate_config(self):236 BuildConfig(self.env, name='foo', path='branches/foo').insert()237 BuildConfig(self.env, name='bar', path='branches/bar').insert()238 redirected_to = []239 def redirect(url):240 redirected_to.append(url)241 raise RequestDone242 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),243 abs_href=Href('http://example.org/'), redirect=redirect,244 authname='joe',245 args={'apply': '', 'active': ['foo']})246 provider = BuildConfigurationsAdminPageProvider(self.env)247 try:248 provider.render_admin_panel(req, 'bitten', 'configs', '')249 self.fail('Expected RequestDone')250 except RequestDone:251 self.assertEqual('http://example.org/admin/bitten/configs',252 redirected_to[0])253 config = BuildConfig.fetch(self.env, name='foo')254 self.assertEqual(True, config.active)255 def test_process_deactivate_config(self):256 BuildConfig(self.env, name='foo', path='branches/foo',257 active=True).insert()258 BuildConfig(self.env, name='bar', path='branches/bar',259 active=True).insert()260 redirected_to = []261 def redirect(url):262 redirected_to.append(url)263 raise RequestDone264 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),265 abs_href=Href('http://example.org/'), redirect=redirect,266 authname='joe',267 args={'apply': ''})268 provider = BuildConfigurationsAdminPageProvider(self.env)269 try:270 provider.render_admin_panel(req, 'bitten', 'configs', '')271 self.fail('Expected RequestDone')272 except RequestDone:273 self.assertEqual('http://example.org/admin/bitten/configs',274 redirected_to[0])275 config = BuildConfig.fetch(self.env, name='foo')276 self.assertEqual(False, config.active)277 config = BuildConfig.fetch(self.env, name='bar')278 self.assertEqual(False, config.active)279 def test_process_add_config(self):280 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',281 active=True).insert()282 redirected_to = []283 def redirect(url):284 redirected_to.append(url)285 raise RequestDone286 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),287 abs_href=Href('http://example.org/'), redirect=redirect,288 authname='joe',289 args={'add': '', 'name': 'bar', 'label': 'Bar'})290 provider = BuildConfigurationsAdminPageProvider(self.env)291 try:292 provider.render_admin_panel(req, 'bitten', 'configs', '')293 self.fail('Expected RequestDone')294 except RequestDone:295 self.assertEqual('http://example.org/admin/bitten/configs/bar',296 redirected_to[0])297 config = BuildConfig.fetch(self.env, name='bar')298 self.assertEqual('Bar', config.label)299 def test_process_add_config_cancel(self):300 redirected_to = []301 def redirect(url):302 redirected_to.append(url)303 raise RequestDone304 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),305 abs_href=Href('http://example.org/'), redirect=redirect,306 args={'cancel': '', 'name': 'bar', 'label': 'Bar'})307 provider = BuildConfigurationsAdminPageProvider(self.env)308 try:309 provider.render_admin_panel(req, 'bitten', 'configs', '')310 self.fail('Expected RequestDone')311 except RequestDone:312 self.assertEqual('http://example.org/admin/bitten/configs',313 redirected_to[0])314 configs = list(BuildConfig.select(self.env, include_inactive=True))315 self.assertEqual(0, len(configs))316 def test_process_add_config_no_name(self):317 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),318 chrome={'warnings': []}, href=Href('/'),319 authname='joe', args={'add': ''})320 provider = BuildConfigurationsAdminPageProvider(self.env)321 try:322 provider.render_admin_panel(req, 'bitten', 'configs', '')323 self.fail('Expected TracError')324 except TracError, e:325 self.assertEqual('Missing required field "name".', e.message)326 self.assertEqual('Add Configuration', e.title)327 def test_process_add_config_invalid_name(self):328 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),329 chrome={'warnings': []}, href=Href('/'), authname='joe',330 args={'add': '', 'name': 'no spaces allowed'})331 provider = BuildConfigurationsAdminPageProvider(self.env)332 try:333 provider.render_admin_panel(req, 'bitten', 'configs', '')334 self.fail('Expected TracError')335 except TracError, e:336 self.assertEqual('The field "name" may only contain letters, '337 'digits, periods, or dashes.', e.message)338 self.assertEqual('Add Configuration', e.title)339 def test_new_config_submit_with_invalid_path(self):340 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),341 authname='joe',342 args={'add': '', 'name': 'foo', 'path': 'invalid/path'})343 def get_node(path, rev=None):344 raise TracError('No such node')345 self.repos.get_node = get_node346 provider = BuildConfigurationsAdminPageProvider(self.env)347 try:348 provider.render_admin_panel(req, 'bitten', 'configs', '')349 self.fail('Expected TracError')350 except TracError, e:351 self.failUnless('Invalid Repository Path' in e.message)352 def test_process_add_config_no_perms(self):353 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',354 active=True).insert()355 PermissionSystem(self.env).revoke_permission('joe', 'BUILD_CREATE')356 req = Mock(method='POST',357 perm=PermissionCache(self.env, 'joe'),358 args={'add': '', 'name': 'bar', 'label': 'Bar'})359 provider = BuildConfigurationsAdminPageProvider(self.env)360 self.assertRaises(PermissionError, provider.render_admin_panel, req,361 'bitten', 'configs', '')362 def test_process_remove_config(self):363 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',364 active=True).insert()365 BuildConfig(self.env, name='bar', label='Bar', path='branches/bar',366 min_rev='123', max_rev='456').insert()367 redirected_to = []368 def redirect(url):369 redirected_to.append(url)370 raise RequestDone371 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),372 abs_href=Href('http://example.org/'), redirect=redirect,373 args={'remove': '', 'sel': 'bar'})374 provider = BuildConfigurationsAdminPageProvider(self.env)375 try:376 provider.render_admin_panel(req, 'bitten', 'configs', '')377 self.fail('Expected RequestDone')378 except RequestDone:379 self.assertEqual('http://example.org/admin/bitten/configs',380 redirected_to[0])381 assert not BuildConfig.fetch(self.env, name='bar')382 def test_process_remove_config_cancel(self):383 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',384 active=True).insert()385 BuildConfig(self.env, name='bar', label='Bar', path='branches/bar',386 min_rev='123', max_rev='456').insert()387 redirected_to = []388 def redirect(url):389 redirected_to.append(url)390 raise RequestDone391 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),392 abs_href=Href('http://example.org/'), redirect=redirect,393 args={'cancel': '', 'sel': 'bar'})394 provider = BuildConfigurationsAdminPageProvider(self.env)395 try:396 provider.render_admin_panel(req, 'bitten', 'configs', '')397 self.fail('Expected RequestDone')398 except RequestDone:399 self.assertEqual('http://example.org/admin/bitten/configs',400 redirected_to[0])401 configs = list(BuildConfig.select(self.env, include_inactive=True))402 self.assertEqual(2, len(configs))403 def test_process_remove_config_no_selection(self):404 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',405 active=True).insert()406 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),407 args={'remove': ''})408 provider = BuildConfigurationsAdminPageProvider(self.env)409 try:410 provider.render_admin_panel(req, 'bitten', 'configs', '')411 self.fail('Expected TracError')412 except TracError, e:413 self.assertEqual('No configuration selected', e.message)414 def test_process_remove_config_bad_selection(self):415 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',416 active=True).insert()417 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),418 args={'remove': '', 'sel': 'baz'})419 provider = BuildConfigurationsAdminPageProvider(self.env)420 try:421 provider.render_admin_panel(req, 'bitten', 'configs', '')422 self.fail('Expected TracError')423 except TracError, e:424 self.assertEqual("Configuration 'baz' not found", e.message)425 def test_process_remove_config_no_perms(self):426 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',427 active=True).insert()428 PermissionSystem(self.env).revoke_permission('joe', 'BUILD_DELETE')429 req = Mock(method='POST',430 perm=PermissionCache(self.env, 'joe'),431 args={'remove': '', 'sel': 'bar'})432 provider = BuildConfigurationsAdminPageProvider(self.env)433 self.assertRaises(PermissionError, provider.render_admin_panel, req,434 'bitten', 'configs', '')435 def test_process_update_config(self):436 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',437 active=True).insert()438 redirected_to = []439 def redirect(url):440 redirected_to.append(url)441 raise RequestDone442 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),443 abs_href=Href('http://example.org/'), redirect=redirect,444 authname='joe', chrome={'warnings': [], 'notices': []},445 href=Href('/'),446 args={'save': '', 'name': 'foo', 'label': 'Foobar',447 'description': 'Thanks for all the fish!'})448 provider = BuildConfigurationsAdminPageProvider(self.env)449 try:450 provider.render_admin_panel(req, 'bitten', 'configs', 'foo')451 self.fail('Expected RequestDone')452 except RequestDone:453 self.assertEqual(['Configuration Saved.'], req.chrome['notices'])454 self.assertEqual('http://example.org/admin/bitten/configs/foo',455 redirected_to[0])456 config = BuildConfig.fetch(self.env, name='foo')457 self.assertEqual('Foobar', config.label)458 self.assertEqual('Thanks for all the fish!', config.description)459 def test_process_update_config_no_name(self):460 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',461 active=True).insert()462 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),463 args={'save': '', 'name': ''}, authname='joe',464 chrome={'warnings':[]}, href=Href('/'))465 provider = BuildConfigurationsAdminPageProvider(self.env)466 provider.render_admin_panel(req, 'bitten', 'configs', 'foo')467 self.failUnless(req.chrome['warnings'], "No warnings?")468 self.assertEquals(['Missing required field "name".'],469 req.chrome['warnings'])470 def test_process_update_config_invalid_name(self):471 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',472 active=True).insert()473 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),474 args={'save': '', 'name': 'no spaces allowed'},475 authname='joe', chrome={'warnings': []},476 href=Href('/'))477 provider = BuildConfigurationsAdminPageProvider(self.env)478 provider.render_admin_panel(req, 'bitten', 'configs', 'foo')479 self.failUnless(req.chrome['warnings'], "No warnings?")480 self.assertEquals(req.chrome['warnings'], ['The field "name" may ' \481 'only contain letters, digits, periods, or dashes.'])482 def test_process_update_config_invalid_path(self):483 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',484 active=True).insert()485 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),486 authname='joe', chrome={'warnings': []}, href=Href('/'),487 args={'save': '', 'name': 'foo', 'path': 'invalid/path'})488 def get_node(path, rev=None):489 raise TracError('No such node')490 self.repos.get_node = get_node491 provider = BuildConfigurationsAdminPageProvider(self.env)492 provider.render_admin_panel(req, 'bitten', 'configs', 'foo')493 494 self.failUnless(req.chrome['warnings'], "No warnings?")495 self.assertTrue(req.chrome['warnings'] in (496 # single repos / 0.11497 ['Invalid Repository Path: "invalid/path" does not exist '498 'within the "(default)" repository.'],499 # multi-repos / 0.12+500 ['Invalid Repository Path "invalid/path".']),501 req.chrome['warnings'])502 def test_process_update_config_non_wellformed_recipe(self):503 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',504 active=True).insert()505 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),506 authname='joe', chrome={'warnings': []}, href=Href('/'),507 args={'save': '', 'name': 'foo', 'recipe': 'not_xml'})508 provider = BuildConfigurationsAdminPageProvider(self.env)509 provider.render_admin_panel(req, 'bitten', 'configs', 'foo')510 self.failUnless(req.chrome['warnings'], "No warnings?")511 self.assertEquals(['Failure parsing recipe: syntax error: line 1, ' \512 'column 0.'], req.chrome['warnings'])513 def test_process_update_config_invalid_recipe(self):514 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',515 active=True).insert()516 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),517 authname='joe', chrome={'warnings': []}, href=Href('/'),518 args={'save': '', 'name': 'foo',519 'recipe': '<build><step /></build>'})520 provider = BuildConfigurationsAdminPageProvider(self.env)521 provider.render_admin_panel(req, 'bitten', 'configs', 'foo')522 523 self.failUnless(req.chrome['warnings'], "No warnings?")524 self.assertEquals(req.chrome['warnings'],525 ['Invalid Recipe: Steps must have an "id" attribute.'])526 def test_process_new_platform_no_name(self):527 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',528 active=True).insert()529 data = {}530 req = Mock(method='POST', chrome={}, hdf=data, href=Href('/'),531 perm=PermissionCache(self.env, 'joe'),532 args={'new': '', 'platform_name': ''})533 provider = BuildConfigurationsAdminPageProvider(self.env)534 try:535 provider.render_admin_panel(req, 'bitten', 'configs', 'foo')536 self.fail("No TracError?")537 except Exception, e:538 self.assertEquals(e.message, 'Missing required field "name"')539 self.assertEquals(e.title, 'Missing field')540 def test_process_new_platform(self):541 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',542 active=True).insert()543 redirected_to = []544 def redirect(url):545 redirected_to.append(url)546 raise RequestDone547 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),548 abs_href=Href('http://example.org/'), redirect=redirect,549 authname='joe',550 args={'add': '', 'new': '', 'platform_name': 'Test'})551 provider = BuildConfigurationsAdminPageProvider(self.env)552 try:553 provider.render_admin_panel(req, 'bitten', 'configs', 'foo')554 self.fail('Expected RequestDone')555 except RequestDone:556 self.assertEqual('http://example.org/admin/bitten/configs/foo/1',557 redirected_to[0])558 platforms = list(TargetPlatform.select(self.env, config='foo'))559 self.assertEqual(1, len(platforms))560 self.assertEqual('Test', platforms[0].name)561 self.assertEqual([], platforms[0].rules)562 def test_process_remove_platforms(self):563 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',564 active=True).insert()565 platform = TargetPlatform(self.env, config='foo', name='any')566 platform.insert()567 redirected_to = []568 def redirect(url):569 redirected_to.append(url)570 raise RequestDone571 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),572 abs_href=Href('http://example.org/'), redirect=redirect,573 authname='joe', chrome={'warnings': [], 'notices': []},574 href=Href('/'),575 args={'remove': '', 'sel': str(platform.id)})576 provider = BuildConfigurationsAdminPageProvider(self.env)577 try:578 provider.render_admin_panel(req, 'bitten', 'configs', 'foo')579 self.fail('Expected RequestDone')580 except RequestDone:581 self.assertEquals(['Target Platform(s) Removed.'],582 req.chrome['notices'])583 self.assertEqual('http://example.org/admin/bitten/configs/foo',584 redirected_to[0])585 platforms = list(TargetPlatform.select(self.env, config='foo'))586 self.assertEqual(0, len(platforms))587 def test_process_remove_platforms_no_selection(self):588 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',589 active=True).insert()590 platform = TargetPlatform(self.env, config='foo', name='any')591 platform.insert()592 redirected_to = []593 def redirect(url):594 redirected_to.append(url)595 raise RequestDone596 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),597 abs_href=Href('http://example.org/'), redirect=redirect,598 authname='joe',599 args={'remove': ''})600 provider = BuildConfigurationsAdminPageProvider(self.env)601 try:602 provider.render_admin_panel(req, 'bitten', 'configs', 'foo')603 self.fail('Expected TracError')604 except TracError, e:605 self.assertEqual('No platform selected', e.message)606 def test_process_edit_platform(self):607 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',608 active=True).insert()609 platform = TargetPlatform(self.env, config='foo', name='any')610 platform.insert()611 req = Mock(method='GET', chrome={}, href=Href('/'),612 perm=PermissionCache(self.env, 'joe'), args={})613 provider = BuildConfigurationsAdminPageProvider(self.env)614 template_name, data = provider.render_admin_panel(615 req, 'bitten', 'configs', 'foo/%d' % platform.id616 )617 self.assertEqual('bitten_admin_configs.html', template_name)618 assert 'platform' in data619 platform = data['platform']620 self.assertEqual({621 'id': 1, 'exists': True, 'name': 'any', 'rules': [('', '')],622 }, platform)623 def test_process_update_platform(self):624 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',625 active=True).insert()626 platform = TargetPlatform(self.env, config='foo', name='any')627 platform.insert()628 redirected_to = []629 def redirect(url):630 redirected_to.append(url)631 raise RequestDone632 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),633 abs_href=Href('http://example.org/'), redirect=redirect,634 authname='joe',635 args={'save': '', 'edit': '', 'name': 'Test',636 'property_0': 'family', 'pattern_0': 'posix'})637 provider = BuildConfigurationsAdminPageProvider(self.env)638 try:639 provider.render_admin_panel(req, 'bitten', 'configs',640 'foo/%d' % platform.id)641 self.fail('Expected RequestDone')642 except RequestDone:643 self.assertEqual('http://example.org/admin/bitten/configs/foo',644 redirected_to[0])645 platforms = list(TargetPlatform.select(self.env, config='foo'))646 self.assertEqual(1, len(platforms))647 self.assertEqual('Test', platforms[0].name)648 self.assertEqual([('family', 'posix')], platforms[0].rules)649 def test_process_update_platform_cancel(self):650 BuildConfig(self.env, name='foo', label='Foo', path='branches/foo',651 active=True).insert()652 platform = TargetPlatform(self.env, config='foo', name='any')653 platform.insert()654 redirected_to = []655 def redirect(url):656 redirected_to.append(url)657 raise RequestDone658 req = Mock(method='POST', perm=PermissionCache(self.env, 'joe'),659 abs_href=Href('http://example.org/'), redirect=redirect,660 authname='joe',661 args={'cancel': '', 'edit': '', 'name': 'Changed',662 'property_0': 'family', 'pattern_0': 'posix'})663 provider = BuildConfigurationsAdminPageProvider(self.env)664 try:665 provider.render_admin_panel(req, 'bitten', 'configs',666 'foo/%d' % platform.id)667 self.fail('Expected RequestDone')668 except RequestDone:669 self.assertEqual('http://example.org/admin/bitten/configs/foo',670 redirected_to[0])671 platforms = list(TargetPlatform.select(self.env, config='foo'))672 self.assertEqual(1, len(platforms))673 self.assertEqual('any', platforms[0].name)674 self.assertEqual([], platforms[0].rules)675def suite():676 suite = unittest.TestSuite()677 suite.addTest(unittest.makeSuite(678 BuildMasterAdminPageProviderTestCase, 'test'679 ))680 suite.addTest(unittest.makeSuite(681 BuildConfigurationsAdminPageProviderTestCase, 'test'682 ))683 return suite684if __name__ == '__main__':...

Full Screen

Full Screen

redirector.py

Source:redirector.py Github

copy

Full Screen

1from datetime import datetime2from logging import Logger, getLogger3from math import floor4from typing import Optional, Any5from aiohttp import ClientSession6from discord import Embed, User, HTTPException7from discord.ext.commands import Cog, command, Option, Context8from discord.ext.menus import ListPageSource, ViewMenuPages9from discord.ext.tasks import loop10from motor.motor_asyncio import AsyncIOMotorCollection11from utils.bots import BOT_TYPES, CustomContext12from utils.database import Document13from utils.misc import random_string, UTC_OFFSET14API_URL: str = "https://redirector.regulad.xyz/"15SHORT_URL: str = "https://crud.space/"16logger: Logger = getLogger(__name__)17class LoggedEvent:18 """A logged redirector event."""19 def __init__(self, link_id: str, redirected_to: str, redirected_at: datetime, remote: str,20 user_agent: Optional[str], *, document: Document):21 self.link_id: str = link_id22 self.redirected_to: str = redirected_to23 self.redirected_at: datetime = redirected_at24 self.remote: str = remote25 self.document: Document = document26 self.user_agent: Optional[str] = user_agent27 __slots__ = ('link_id', 'redirected_to', 'redirected_at', 'remote', 'document', 'user_agent')28class CampaignSource(ListPageSource):29 def __init__(self, campaigns: list[tuple[str, datetime]], *, per_page: int = 10) -> None:30 super().__init__(sorted(campaigns, key=lambda x: x[1], reverse=True), per_page=per_page)31 async def format_page(self, menu, entries):32 offset = menu.current_page * self.per_page33 embed: Embed = Embed(title=f"All campaigns", color=0x00ff00)34 embed.set_footer(text=f"Page {menu.current_page + 1}/{self.get_max_pages()}")35 for iteration, entry in enumerate(entries, start=offset):36 embed.add_field(37 name=f"{iteration + 1}. {entry[0]}",38 value=f"<t:{floor((entry[1] - UTC_OFFSET).timestamp())}>",39 inline=False,40 )41 return embed42class HitSource(ListPageSource):43 def __init__(self, hits: list[LoggedEvent], campaign_id: str, *, per_page: int = 4) -> None:44 self.campaign_id: str = campaign_id45 super().__init__(sorted(hits, key=lambda x: x.redirected_at, reverse=True), per_page=per_page)46 async def format_page(self, menu, entries):47 offset = menu.current_page * self.per_page48 embed: Embed = Embed(49 title=f"Campaign {self.campaign_id} summary", color=0x00ff00, description=f"{len(self.entries)} total hits"50 )51 embed.set_footer(text=f"Page {menu.current_page + 1}/{self.get_max_pages()}")52 for iteration, entry in enumerate(entries, start=offset):53 entry: LoggedEvent54 iteration: int55 embed.add_field(56 name=f"{iteration + 1}. ",57 value=f"Occurred at <t:{floor((entry.redirected_at - UTC_OFFSET).timestamp())}>\n"58 f"Redirected to [{entry.redirected_to}]({entry.redirected_to})\n"59 f"IP Address: `{entry.remote}`"60 + (f"\nUser Agent: `{entry.user_agent}`" if entry.user_agent is not None else ""),61 inline=False,62 )63 return embed64async def get_listen_doc(collection: AsyncIOMotorCollection, link_id: str) -> Document:65 return await Document.get_document(collection, {"_id": link_id})66def get_collection(bot: BOT_TYPES) -> AsyncIOMotorCollection:67 """Returns the database for the Redirector cog."""68 return bot.database["redirector"]69class Redirector(Cog):70 """A cog that allows you to use Redirector, an IP logging and analytics service."""71 def __init__(self, bot: BOT_TYPES) -> None:72 self.bot: BOT_TYPES = bot73 self.client_session: Optional[ClientSession] = None74 @Cog.listener()75 async def on_ready(self) -> None:76 # TODO: Webhook implementation77 self.check_for_redirections.start()78 @loop(seconds=60)79 async def check_for_redirections(self) -> None:80 await self.get_client()81 async for raw_document in get_collection(self.bot).find({}):82 document: Document = Document(raw_document, collection=get_collection(self.bot),83 query={"_id": raw_document["_id"]})84 if document.get("listening_user") is None:85 continue86 else:87 async with self.client_session.get(f"{API_URL}hits/{document['_id']}") as response:88 if response.status == 200:89 data: list[dict[str, Any]] = await response.json()90 for entry in data:91 timestamp: datetime = datetime.fromisoformat(entry["timestamp"])92 if timestamp > document["last_checked"]:93 self.bot.dispatch(94 "redirection",95 LoggedEvent(96 entry["link_id"],97 entry["redirected_to"],98 timestamp,99 entry["remote"],100 entry.get("user_agent"),101 document=document,102 )103 )104 await document.update_db({"$currentDate": {"last_checked": True}})105 else:106 logger.warning(f"Failed to get redirections for {document['_id']}")107 async def get_client(self) -> None:108 if self.client_session is None:109 self.client_session = ClientSession()110 async def cog_before_invoke(self, ctx: Context) -> None:111 await self.get_client()112 def cog_unload(self) -> None:113 if self.client_session is not None:114 self.bot.loop.create_task(self.client_session.close())115 if self.check_for_redirections.is_running():116 self.check_for_redirections.stop()117 @Cog.listener()118 async def on_redirection(self, logged_event: LoggedEvent) -> None:119 """A listener that is called when a redirection is logged."""120 listening_document: Document = logged_event.document121 if listening_document.get("listening_user") is not None:122 listening_user: User = (123 self.bot.get_user(listening_document["listening_user"])124 or await self.bot.fetch_user(listening_document["listening_user"])125 )126 embed: Embed = (127 Embed(128 title="Result",129 description=f"Regarding campaign ID `{logged_event.link_id}`"130 )131 .add_field(132 name="Redirected to:",133 value=f"[{logged_event.redirected_to}]({logged_event.redirected_to})"134 )135 .add_field(136 name="Redirected at:",137 value=f"<t:{floor((logged_event.redirected_at - UTC_OFFSET).timestamp())}>"138 )139 .add_field(140 name="IP Address:",141 value=f"`{logged_event.remote}`"142 )143 )144 if logged_event.user_agent is not None:145 embed: Embed = embed.add_field(146 name="User Agent:",147 value=f"`{logged_event.user_agent}`"148 )149 await listening_user.send(150 "An IP address has been grabbed!",151 embed=embed152 )153 @command()154 async def stopgrabbing(155 self,156 ctx: CustomContext,157 link_id: str = Option(name="campaign_id", description="The campaign ID to stop listening to.")158 ) -> None:159 """Stops listening to a campaign ID."""160 await ctx.defer(ephemeral=True)161 listening_document: Document = await get_listen_doc(get_collection(self.bot), link_id)162 if listening_document.get("listening_user") is None:163 await ctx.send("That campaign ID is not being listened to.", ephemeral=True)164 return165 if listening_document["listening_user"] != ctx.author.id:166 await ctx.send("You are not listening to that campaign ID.", ephemeral=True)167 return168 await listening_document.delete_db()169 await ctx.send("You are no longer listening to that campaign ID.", ephemeral=True)170 @command(aliases=["register_listener", "make_campaign"])171 async def ipgrab(172 self,173 ctx: CustomContext,174 destination: str = Option(description="The URL to redirect the victim to."),175 link_id: Optional[str] = Option(name="campaign_id", description="The campaign ID to register.")176 ) -> None:177 """178 Registers a redirector campaign.179 This can be used for IP grabbing, analytics, link shortening, or anything else that needs to be redirected.180 """181 await ctx.defer(ephemeral=True)182 link_id: str = link_id or random_string(length=6)183 # Start listening184 listening_document: Document = await get_listen_doc(get_collection(self.bot), link_id)185 await listening_document.update_db(186 {"$set": {"listening_user": ctx.author.id}, "$currentDate": {"last_checked": True, "created_at": True}}187 )188 async with self.client_session.post(f"{API_URL}{link_id}", data=destination) as response:189 if response.status != 201:190 raise HTTPException(response, response.reason) # Possibly jank. It should be fine.191 # TODO: Webhook implementation192 await ctx.send(193 f"Campaign link created: <{SHORT_URL}{link_id}>\n"194 f"Your campaign ID is: `{link_id}`\n"195 f"Make sure your DMs are open. You will receive notifications there.",196 ephemeral=True197 )198 @command()199 async def all_campaigns(self, ctx: CustomContext) -> None:200 """Lists all the redirector campaigns you are listening to."""201 await ctx.defer(ephemeral=True)202 all_campaigns: list[tuple[str, datetime]] = []203 async for document in get_collection(self.bot).find({"listening_user": ctx.author.id}):204 all_campaigns.append((document["_id"], document["created_at"]))205 source: CampaignSource = CampaignSource(all_campaigns)206 await ViewMenuPages(source).start(ctx, ephemeral=True)207 @command()208 async def summarize_campaign(209 self,210 ctx: CustomContext,211 link_id: str = Option(name="campaign_id", description="The campaign ID to summarize."),212 ) -> None:213 """Summarizes a redirector campaign."""214 await ctx.defer(ephemeral=True)215 all_hits: list[LoggedEvent] = []216 document: Document = await get_listen_doc(get_collection(self.bot), link_id)217 async with self.client_session.get(f"{API_URL}hits/{document['_id']}") as response:218 data: list[dict[str, Any]] = await response.json()219 for entry in data:220 timestamp: datetime = datetime.fromisoformat(entry["timestamp"])221 all_hits.append(222 LoggedEvent(223 entry["link_id"],224 entry["redirected_to"],225 timestamp,226 entry["remote"],227 entry.get("user_agent"),228 document=document,229 )230 )231 source: HitSource = HitSource(all_hits, link_id)232 await ViewMenuPages(source).start(ctx, ephemeral=True)233def setup(bot: BOT_TYPES) -> None:234 """Loads the Redirector cog."""...

Full Screen

Full Screen

CWSRequests.py

Source:CWSRequests.py Github

copy

Full Screen

1#!/usr/bin/env python32# Contest Management System - http://cms-dev.github.io/3# Copyright © 2010-2012 Giovanni Mascellani <mascellani@poisson.phc.unipi.it>4# Copyright © 2010-2018 Stefano Maggiolo <s.maggiolo@gmail.com>5# Copyright © 2010-2012 Matteo Boscariol <boscarim@hotmail.com>6# Copyright © 2014 Artem Iglikov <artem.iglikov@gmail.com>7# Copyright © 2016 Luca Wehrstedt <luca.wehrstedt@gmail.com>8# Copyright © 2017 Luca Chiodini <luca@chiodini.org>9#10# This program is free software: you can redistribute it and/or modify11# it under the terms of the GNU Affero General Public License as12# published by the Free Software Foundation, either version 3 of the13# License, or (at your option) any later version.14#15# This program is distributed in the hope that it will be useful,16# but WITHOUT ANY WARRANTY; without even the implied warranty of17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the18# GNU Affero General Public License for more details.19#20# You should have received a copy of the GNU Affero General Public License21# along with this program. If not, see <http://www.gnu.org/licenses/>.22import logging23import os24import random25import re26import tempfile27from urllib.parse import parse_qs, urlsplit28from cms import config29from cms.grading.languagemanager import filename_to_language30from cmscommon.crypto import decrypt_number31from cmstestsuite.web import GenericRequest, LoginRequest32logger = logging.getLogger(__name__)33class CWSLoginRequest(LoginRequest):34 def test_success(self):35 if not LoginRequest.test_success(self):36 return False37 if self.redirected_to.rstrip("/") != self.base_url.rstrip("/"):38 return False39 return True40class HomepageRequest(GenericRequest):41 """Load the main page of CWS.42 """43 def __init__(self, browser, username, loggedin, base_url=None):44 GenericRequest.__init__(self, browser, base_url)45 self.url = self.base_url46 self.username = username47 self.loggedin = loggedin48 def describe(self):49 return "check the main page"50 def test_success(self):51 if not GenericRequest.test_success(self):52 return False53 username_re = re.compile(self.username)54 if self.loggedin:55 if username_re.search(self.res_data) is None:56 return False57 else:58 if username_re.search(self.res_data) is not None:59 return False60 return True61class TaskRequest(GenericRequest):62 """Load a task page in CWS.63 """64 def __init__(self, browser, task_id, base_url=None):65 GenericRequest.__init__(self, browser, base_url)66 self.url = "%s/tasks/%s/description" % (self.base_url, task_id)67 self.task_id = task_id68 def describe(self):69 return "load page for task %s (%s)" % (self.task_id, self.url)70class TaskStatementRequest(GenericRequest):71 """Load a task statement in CWS.72 """73 def __init__(self, browser, task_id, language_code, base_url=None):74 GenericRequest.__init__(self, browser, base_url)75 self.url = "%s/tasks/%s/statements/%s" % (self.base_url,76 task_id, language_code)77 self.task_id = task_id78 def describe(self):79 return "load statement for task %s (%s)" % (self.task_id, self.url)80 def specific_info(self):81 return '\nNO DATA DUMP FOR TASK STATEMENTS\n'82class SubmitRequest(GenericRequest):83 """Submit a solution in CWS.84 """85 def __init__(self, browser, task, submission_format,86 filenames, language=None, base_url=None):87 GenericRequest.__init__(self, browser, base_url)88 self.url = "%s/tasks/%s/submit" % (self.base_url, task[1])89 self.task = task90 self.submission_format = submission_format91 self.filenames = filenames92 self.data = {}93 # If not passed, try to recover the language from the filenames.94 if language is None:95 for filename in filenames:96 lang = filename_to_language(filename)97 if lang is not None:98 language = lang.name99 break100 # Only send the language in the request if not None.101 if language is not None:102 self.data = {"language": language}103 def _prepare(self):104 GenericRequest._prepare(self)105 self.files = list(zip(self.submission_format, self.filenames))106 def describe(self):107 return "submit sources %s for task %s (ID %d) %s" % \108 (repr(self.filenames), self.task[1], self.task[0], self.url)109 def specific_info(self):110 return 'Task: %s (ID %d)\nFile: %s\n' % \111 (self.task[1], self.task[0], repr(self.filenames)) + \112 GenericRequest.specific_info(self)113 def test_success(self):114 if not GenericRequest.test_success(self):115 return False116 return self.get_submission_id() is not None117 def get_submission_id(self):118 # Only valid after self.execute()119 # Parse submission ID out of redirect.120 if self.redirected_to is None:121 return None122 query = parse_qs(urlsplit(self.redirected_to).query)123 if "submission_id" not in query or len(query["submission_id"]) != 1:124 logger.warning("Redirected to an unexpected page: `%s'",125 self.redirected_to)126 return None127 try:128 submission_id = decrypt_number(query["submission_id"][0],129 config.secret_key)130 except Exception:131 logger.warning("Unable to decrypt submission id from page: `%s'",132 self.redirected_to)133 return None134 return submission_id135class SubmitUserTestRequest(GenericRequest):136 """Submit a user test in CWS."""137 def __init__(self, browser, task, submission_format,138 filenames, language=None, base_url=None):139 GenericRequest.__init__(self, browser, base_url)140 self.url = "%s/tasks/%s/test" % (self.base_url, task[1])141 self.task = task142 self.submission_format = submission_format143 self.filenames = filenames144 self.data = {}145 # If not passed, try to recover the language from the filenames.146 if language is None:147 for filename in filenames:148 lang = filename_to_language(filename)149 if lang is not None:150 language = lang.name151 break152 # Only send the language in the request if not None.153 if language is not None:154 self.data = {"language": language}155 def _prepare(self):156 GenericRequest._prepare(self)157 # Let's generate an arbitrary input file.158 # TODO: delete this file once we're done with it.159 _, temp_filename = tempfile.mkstemp()160 self.files = \161 list(zip(self.submission_format, self.filenames)) + \162 [("input", temp_filename)]163 def describe(self):164 return "submit user test %s for task %s (ID %d) %s" % \165 (repr(self.filenames), self.task[1], self.task[0], self.url)166 def specific_info(self):167 return 'Task: %s (ID %d)\nFile: %s\n' % \168 (self.task[1], self.task[0], repr(self.filenames)) + \169 GenericRequest.specific_info(self)170 def test_success(self):171 if not GenericRequest.test_success(self):172 return False173 return self.get_user_test_id() is not None174 def get_user_test_id(self):175 # Only valid after self.execute()176 # Parse submission ID out of redirect.177 if self.redirected_to is None:178 return None179 query = parse_qs(urlsplit(self.redirected_to).query)180 if "user_test_id" not in query or len(query["user_test_id"]) != 1:181 logger.warning("Redirected to an unexpected page: `%s'",182 self.redirected_to)183 return None184 try:185 user_test_id = decrypt_number(query["user_test_id"][0],186 config.secret_key)187 except Exception:188 logger.warning("Unable to decrypt user test id from page: `%s'",189 self.redirected_to)190 return None191 return user_test_id192class TokenRequest(GenericRequest):193 """Release test a submission.194 """195 def __init__(self, browser, task, submission_num, base_url=None):196 GenericRequest.__init__(self, browser, base_url)197 self.url = "%s/tasks/%s/submissions/%s/token" % (self.base_url,198 task[1],199 submission_num)200 self.task = task201 self.submission_num = submission_num202 self.data = {}203 def describe(self):204 return "release test the %s-th submission for task %s (ID %d)" % \205 (self.submission_num, self.task[1], self.task[0])206 def specific_info(self):207 return 'Task: %s (ID %d)\nSubmission: %s\n' % \208 (self.task[1], self.task[0], self.submission_num) + \209 GenericRequest.specific_info(self)210class SubmitRandomRequest(GenericRequest):211 """Submit a solution in CWS.212 """213 def __init__(self, browser, task, base_url=None,214 submissions_path=None):215 GenericRequest.__init__(self, browser, base_url)216 self.url = "%s/tasks/%s/submit" % (self.base_url, task[1])217 self.task = task218 self.submissions_path = submissions_path219 self.data = {}220 def _prepare(self):221 """Select a random solution and prepare it for submission.222 If task/ is the task directory, it might contain files (only223 if the submission format is with a single file) and224 directory. If it contains a file, it is assumed that it is the225 only element in the submission format, and is the basename226 without extension of the file. If it is a directory, all files227 inside are assumed to be part of the submission format with228 their basenames without extension.229 """230 GenericRequest._prepare(self)231 # Select a random directory or file inside the task directory.232 task_path = os.path.join(self.submissions_path, self.task[1])233 sources = os.listdir(task_path)234 source = random.choice(sources)235 lang = filename_to_language(source)236 if lang is not None:237 self.data["language"] = lang.name238 self.source_path = os.path.join(task_path, source)239 # Compose the submission format240 self.files = []241 if os.path.isdir(self.source_path):242 submission_formats = os.listdir(self.source_path)243 self.files = [('%s.%%l' % (os.path.splitext(sf)[0]),244 os.path.join(self.source_path, sf))245 for sf in submission_formats]246 else:247 submission_format = os.path.splitext(source)[0]248 self.files = [('%s.%%l' % (submission_format), self.source_path)]249 def describe(self):250 return "submit source %s for task %s (ID %d) %s" % \251 (self.source_path, self.task[1], self.task[0], self.url)252 def specific_info(self):253 return 'Task: %s (ID %d)\nFile: %s\n' % \254 (self.task[1], self.task[0], self.source_path) + \...

Full Screen

Full Screen

reject_redirects_test.py

Source:reject_redirects_test.py Github

copy

Full Screen

1# hammertime: A high-volume http fetch library2# Copyright (C) 2016- Delve Labs inc.3#4# This program is free software; you can redistribute it and/or5# modify it under the terms of the GNU General Public License6# as published by the Free Software Foundation; either version 27# of the License.8#9# This program is distributed in the hope that it will be useful,10# but WITHOUT ANY WARRANTY; without even the implied warranty of11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the12# GNU General Public License for more details.13#14# You should have received a copy of the GNU General Public License15# along with this program; if not, write to the Free Software16# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.17from unittest import TestCase18from unittest.mock import MagicMock, patch19from aiohttp.test_utils import make_mocked_coro20from hammertime.http import Entry, StaticResponse21from hammertime.ruleset import RejectRequest22from tests.fixtures import async_test23from hammertime.rules import RejectCatchAllRedirect, RedirectLimiter24from hammertime.kb import KnowledgeBase25class TestRejectCatchAllRedirect(TestCase):26 def setUp(self):27 self.heuristic = RejectCatchAllRedirect()28 self.host = "http://example.com"29 self.fake_engine = MagicMock()30 self.heuristic.set_engine(self.fake_engine)31 self.heuristic.child_heuristics = MagicMock()32 @async_test()33 async def test_after_headers_request_random_filename_in_same_path_as_initial_request_if_response_is_redirect(self):34 self.fake_engine.perform_high_priority = make_mocked_coro(return_value=MagicMock())35 entry0 = self.create_redirected_request("/admin/restricted-resource.php", redirected_to="/admin/login.php")36 entry1 = self.create_redirected_request("/junkpath", redirected_to="/index.html")37 entry2 = self.create_redirected_request("/path/to/admin/resource", redirected_to="/path/to/admin/login.php")38 with patch("hammertime.rules.redirects.uuid4", MagicMock(return_value="uuid")):39 await self.heuristic.after_headers(entry0)40 await self.heuristic.after_headers(entry1)41 await self.heuristic.after_headers(entry2)42 self.assertRequested(self.host + "/admin/uuid", self.fake_engine.perform_high_priority, order=0)43 self.assertRequested(self.host + "/uuid", self.fake_engine.perform_high_priority, order=1)44 self.assertRequested(self.host + "/path/to/admin/uuid", self.fake_engine.perform_high_priority, order=2)45 @async_test()46 async def test_after_headers_doesnt_request_random_filename_if_response_is_not_redirect(self):47 self.fake_engine.perform_high_priority = make_mocked_coro()48 entry = Entry.create("http://example.com/junkpath", response=StaticResponse(404, {}, b"Not found"))49 await self.heuristic.after_headers(entry)50 self.fake_engine.perform_high_priority.assert_not_called()51 @async_test()52 async def test_after_headers_reject_request_if_request_for_random_file_has_same_redirect_as_initial_request(self):53 initial_request = self.create_redirected_request("/admin/resource.php", redirected_to="/admin/login.php")54 returned_entry = self.create_redirected_request("/admin/uuid", redirected_to="/admin/login.php")55 self.fake_engine.perform_high_priority = make_mocked_coro(return_value=returned_entry)56 with self.assertRaises(RejectRequest):57 await self.heuristic.after_headers(initial_request)58 @async_test()59 async def test_after_headers_transform_relative_location_to_absolute_location(self):60 initial_request = self.create_redirected_request("/admin/resource.php", redirected_to="/admin/login.php",61 relative=True)62 returned_entry = self.create_redirected_request("/admin/uuid", redirected_to="/admin/login.php")63 self.fake_engine.perform_high_priority = make_mocked_coro(return_value=returned_entry)64 with self.assertRaises(RejectRequest):65 await self.heuristic.after_headers(initial_request)66 @async_test()67 async def test_ignore_when_path_is_present_in_redirect(self):68 initial_request = self.create_redirected_request("/admin/a.php", redirected_to="/404.php?path=/admin/a.php")69 returned_entry = self.create_redirected_request("/admin/uuid", redirected_to="/404.php?path=/admin/uuid")70 self.fake_engine.perform_high_priority = make_mocked_coro(return_value=returned_entry)71 with self.assertRaises(RejectRequest):72 with patch("hammertime.rules.redirects.uuid4", MagicMock(return_value="uuid")):73 await self.heuristic.after_headers(initial_request)74 @async_test()75 async def test_before_request_accept_request_if_random_file_not_redirected_to_same_path_as_initial_request(self):76 initial_request = self.create_redirected_request("/admin/resource.php", redirected_to="/admin/login.php")77 returned_entry = self.create_redirected_request("/admin/uuid", redirected_to="/catchAll.html")78 self.fake_engine.perform_high_priority = make_mocked_coro(return_value=returned_entry)79 await self.heuristic.after_headers(initial_request)80 @async_test()81 async def test_after_headers_add_request_response_to_knowledge_base(self):82 first_host = "http://example1.com"83 returned_entry = self.create_redirected_request("/anything", relative=True, redirected_to="/login.php")84 self.fake_engine.perform_high_priority = make_mocked_coro(return_value=returned_entry)85 entry0 = self.create_redirected_request("/admin/restricted-resource.php", host=first_host,86 redirected_to="/login.php")87 entry1 = self.create_redirected_request("/junkpath", host=first_host, redirected_to="/login.php")88 entry2 = self.create_redirected_request("/path/to/admin/resource", host=first_host, redirected_to="/login.php")89 second_host = "http://example2.com"90 entry3 = self.create_redirected_request("/admin/restricted-resource.php", host=second_host,91 redirected_to="/login.php")92 entry4 = self.create_redirected_request("/junkpath/", host=second_host, redirected_to="/login.php")93 entry5 = self.create_redirected_request("/path/to/admin/resource", host=second_host, redirected_to="/login.php")94 kb = KnowledgeBase()95 self.heuristic.set_kb(kb)96 with patch("hammertime.rules.redirects.uuid4", MagicMock(return_value="uuid")):97 await self.ignore_reject_request(self.heuristic.after_headers, entry0)98 await self.ignore_reject_request(self.heuristic.after_headers, entry1)99 await self.ignore_reject_request(self.heuristic.after_headers, entry2)100 await self.ignore_reject_request(self.heuristic.after_headers, entry3)101 await self.ignore_reject_request(self.heuristic.after_headers, entry4)102 await self.ignore_reject_request(self.heuristic.after_headers, entry5)103 self.assertEqual(kb.redirects["%s/admin/" % first_host], "%s/login.php" % first_host)104 self.assertEqual(kb.redirects["%s/" % first_host], "%s/login.php" % first_host)105 self.assertEqual(kb.redirects["%s/path/to/admin/" % first_host], "%s/login.php" % first_host)106 self.assertEqual(kb.redirects["%s/admin/" % second_host], "%s/login.php" % second_host)107 self.assertEqual(kb.redirects["%s/junkpath/" % second_host], "%s/login.php" % second_host)108 self.assertEqual(kb.redirects["%s/path/to/admin/" % second_host], "%s/login.php" % second_host)109 @async_test()110 async def test_after_headers_doesnt_request_random_filename_if_redirect_is_already_in_knowledge_base(self):111 self.fake_engine.perform_high_priority = make_mocked_coro()112 kb = KnowledgeBase()113 self.heuristic.set_kb(kb)114 kb.redirects["%s/wp-admin/" % self.host] = "%s/somewhere" % self.host115 entry = self.create_redirected_request("/wp-admin/admin.php", redirected_to="/wp-login.php")116 await self.heuristic.after_headers(entry)117 self.fake_engine.perform_high_priority.assert_not_called()118 def create_redirected_request(self, path, *, host=None, redirected_to, relative=False):119 host = host or self.host120 if relative:121 headers = {"location": redirected_to}122 else:123 headers = {"location": host + redirected_to}124 response = StaticResponse(code=302, headers=headers, content=b"content")125 return Entry.create(host + path, response=response)126 def assertRequested(self, url, request_method, order=None):127 requested = False128 if order is not None:129 call_list = [request_method.call_args_list[order]]130 else:131 call_list = request_method.call_args_list132 for call in call_list:133 args, kwargs = call134 entry = args[0]135 if entry.request.url == url:136 requested = True137 break138 self.assertTrue(requested)139 async def ignore_reject_request(self, coro, arg):140 try:141 await coro(arg)142 except RejectRequest:143 pass144def redirect(url, redirect_to):145 return Entry.create(url, response=StaticResponse(302, headers={146 "location": redirect_to,147 }))148class RedirectLimiterTest(TestCase):149 @async_test()150 async def test_obvious_redirect_to_404(self):151 limiter = RedirectLimiter(sequence_matching=False)152 with self.assertRaises(RejectRequest):153 entry = redirect("http://example.com/foobar", redirect_to="http://example.com/404.php")154 await limiter.after_headers(entry)155 @async_test()156 async def test_full_matches_only(self):157 limiter = RedirectLimiter(sequence_matching=False)158 await limiter.after_headers(redirect("http://example.com/foobar", redirect_to="http://example.com/foobar/1404"))159 await limiter.after_headers(redirect("http://example.com/foobar", redirect_to="http://example.com/foobar/4041"))160 @async_test()161 async def test_obvious_redirect_to_not_found(self):162 limiter = RedirectLimiter(sequence_matching=False)163 with self.assertRaises(RejectRequest):164 entry = redirect("http://example.com/foobar", redirect_to="http://example.com/not-found")165 await limiter.after_headers(entry)166 @async_test()167 async def test_reject_wildely_different_paths(self):168 limiter = RedirectLimiter()169 with self.assertRaises(RejectRequest):170 await limiter.after_headers(redirect("http://example.com/foobar",171 redirect_to="http://example.com/hello-world"))172 @async_test()173 async def test_accept_similar_paths(self):174 limiter = RedirectLimiter()175 await limiter.after_headers(redirect("http://example.com/foobar", redirect_to="http://example.com/foobar/"))176 @async_test()177 async def test_with_relative_paths(self):178 limiter = RedirectLimiter()179 await limiter.after_headers(redirect("http://example.com/foobar", redirect_to="foobar/"))...

Full Screen

Full Screen

Playwright tutorial

LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.

Chapters:

  1. What is Playwright : Playwright is comparatively new but has gained good popularity. Get to know some history of the Playwright with some interesting facts connected with it.
  2. How To Install Playwright : Learn in detail about what basic configuration and dependencies are required for installing Playwright and run a test. Get a step-by-step direction for installing the Playwright automation framework.
  3. Playwright Futuristic Features: Launched in 2020, Playwright gained huge popularity quickly because of some obliging features such as Playwright Test Generator and Inspector, Playwright Reporter, Playwright auto-waiting mechanism and etc. Read up on those features to master Playwright testing.
  4. What is Component Testing: Component testing in Playwright is a unique feature that allows a tester to test a single component of a web application without integrating them with other elements. Learn how to perform Component testing on the Playwright automation framework.
  5. Inputs And Buttons In Playwright: Every website has Input boxes and buttons; learn about testing inputs and buttons with different scenarios and examples.
  6. Functions and Selectors in Playwright: Learn how to launch the Chromium browser with Playwright. Also, gain a better understanding of some important functions like “BrowserContext,” which allows you to run multiple browser sessions, and “newPage” which interacts with a page.
  7. Handling Alerts and Dropdowns in Playwright : Playwright interact with different types of alerts and pop-ups, such as simple, confirmation, and prompt, and different types of dropdowns, such as single selector and multi-selector get your hands-on with handling alerts and dropdown in Playright testing.
  8. Playwright vs Puppeteer: Get to know about the difference between two testing frameworks and how they are different than one another, which browsers they support, and what features they provide.
  9. Run Playwright Tests on LambdaTest: Playwright testing with LambdaTest leverages test performance to the utmost. You can run multiple Playwright tests in Parallel with the LammbdaTest test cloud. Get a step-by-step guide to run your Playwright test on the LambdaTest platform.
  10. Playwright Python Tutorial: Playwright automation framework support all major languages such as Python, JavaScript, TypeScript, .NET and etc. However, there are various advantages to Python end-to-end testing with Playwright because of its versatile utility. Get the hang of Playwright python testing with this chapter.
  11. Playwright End To End Testing Tutorial: Get your hands on with Playwright end-to-end testing and learn to use some exciting features such as TraceViewer, Debugging, Networking, Component testing, Visual testing, and many more.
  12. Playwright Video Tutorial: Watch the video tutorials on Playwright testing from experts and get a consecutive in-depth explanation of Playwright automation testing.

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful