How to use _run_hook method in ATX

Best Python code snippet using ATX

update_issues.py

Source:update_issues.py Github

copy

Full Screen

...89 return list.__getitem__(self, commit_id)90 def url(self):91 return 'mock/repo'92class TestChangegroupHook(_TestBase):93 def _run_hook(self, commit_messages, warning_count=0, update_count=0):94 repo = _MockRepo(commit_messages)95 update_issues.changegroup_hook(self.ui, repo, 0)96 warnings = self.ui.warn.call_args_list97 updates = self.trac_proxy_mock.ticket.update.call_args_list98 self.assertEqual(len(warnings), warning_count)99 self.assertEqual(len(updates), update_count)100 return updates101 def test_commits_with_invalid_message_format_ignored(self):102 self._run_hook([103 '',104 'Issue #1337 - Extraneous characters in issue number',105 'Issue 1337', # No dash, no message.106 'Issue 1337: Colon instead of dash',107 'Noissue no dash',108 'Issue 1337-No space around dash',109 'Fixes 1337 no dash',110 ], warning_count=7)111 def test_noissue_commits_ignored(self):112 self._run_hook(['Noissue - Foo', 'noissue - Bar']) # No updates.113 def test_single_issue_referenced(self):114 updates = self._run_hook(['Issue 1337 - Foo'], update_count=1)115 self.assertEqual(updates[0][0][0], 1337)116 def test_multiline_commit_message(self):117 updates = self._run_hook(['Issue 1337 - Foo\nBar',118 'Issue 1337 - Bar.\nBaz',119 'Fixes 2448 - Foo\n\nBar',120 'Fixes 2448 - Bar\n \nBaz'],121 update_count=2)122 comment_1337 = updates[0][0][1]123 self.assertIn('Issue 1337 - Foo...]', comment_1337)124 self.assertIn('Issue 1337 - Bar...]', comment_1337)125 comment_2448 = updates[1][0][1]126 self.assertIn('Fixes 2448 - Foo]', comment_2448)127 self.assertIn('Fixes 2448 - Bar]', comment_2448)128 def test_multiline_commit_message_crlf(self):129 updates = self._run_hook(['Issue 1337 - Foo\r\nBar',130 'Issue 1337 - Bar.\r\nBaz',131 'Fixes 2448 - Foo\r\n\r\nBar',132 'Fixes 2448 - Bar\r\n \r\nBaz'],133 update_count=2)134 comment_1337 = updates[0][0][1]135 self.assertIn('Issue 1337 - Foo...]', comment_1337)136 self.assertIn('Issue 1337 - Bar...]', comment_1337)137 comment_2448 = updates[1][0][1]138 self.assertIn('Fixes 2448 - Foo]', comment_2448)139 self.assertIn('Fixes 2448 - Bar]', comment_2448)140 def test_missing_issue_referenced(self):141 self._run_hook(['Issue 42 - Bar'], warning_count=1)142 def test_multiple_issues_referenced(self):143 updates = self._run_hook(['Issue 1337, fixes 2448 - Foo'],144 update_count=2)145 self.assertEqual(updates[0][0][0], 1337)146 self.assertEqual(updates[1][0][0], 2448)147 def test_multiple_commits_for_issue(self):148 updates = self._run_hook(['Issue 1337 - Foo', 'Fixes 1337 - Bar'],149 update_count=1)150 comment = updates[0][0][1]151 self.assertIn('000000000000', comment)152 self.assertIn('000000000100', comment)153class TestPushkeyHook(_TestBase):154 def _run_hook(self, commit_messages, bookmark='master',155 warning_count=0, update_count=0):156 repo = _MockRepo(['Base', 'Old'] + commit_messages)157 update_issues.pushkey_hook(self.ui, repo,158 namespace='bookmarks', key=bookmark,159 old=1, new=1 + len(commit_messages))160 warnings = self.ui.warn.call_args_list161 updates = self.trac_proxy_mock.ticket.update.call_args_list162 self.assertEqual(len(warnings), warning_count)163 self.assertEqual(len(updates), update_count)164 return updates165 def _check_update(self, update, issue_id, action='resolve',166 milestone='current'):167 self.assertEqual(update[0][0], issue_id)168 changes = update[0][2]169 self.assertEqual(changes['action'], action)170 if milestone is None:171 self.assertNotIn('milestone', changes)172 else:173 self.assertEqual(changes['milestone'], milestone)174 def test_move_other_bookmark(self):175 self._run_hook(['Fixes 1337 - Foo'], bookmark='other') # No updates.176 def test_one_issue_fixed(self):177 updates = self._run_hook(['Fixes 1337 - Foo'], update_count=1)178 self._check_update(updates[0], 1337)179 def test_fix_closed_issue(self):180 updates = self._run_hook(['fixes 2448 - Foo'], update_count=1)181 self._check_update(updates[0], 2448, action='leave', milestone='other')182 def test_fix_issue_noregexp(self):183 updates = self._run_hook(['Fixes 4670 - Foo'], update_count=1)184 self._check_update(updates[0], 4670, milestone=None)185 def test_fix_issue_no_matching_milestones(self):186 updates = self._run_hook(['Fixes 5781 - Foo'], update_count=1)187 self._check_update(updates[0], 5781, milestone=None)188 def test_fix_many(self):189 updates = self._run_hook(['Fixes 1337 - Foo', 'Fixes 2448 - Bar'],190 update_count=2)191 self._check_update(updates[0], 1337)192 self._check_update(updates[1], 2448, action='leave', milestone='other')193 def test_fix_nonexistent(self):194 self._run_hook(['Fixes 7331 - Foo'], warning_count=1)195 def test_fix_closed_with_assigned_milestone(self):196 self._run_hook(['fixes 3559 - Foo']) # No updates.197if __name__ == '__main__':...

Full Screen

Full Screen

app.py

Source:app.py Github

copy

Full Screen

...31 }32 self.args = args33 self.kwargs = kwargs34 logging.info(f'{self.title} [{self.version}] inited')35 def _run_hook(self, hook_name, *args):36 if not self.hooks[hook_name]:37 return38 self.hooks[hook_name](*args)39 def add_hook(self, hook_name, hook_func):40 if hook_name not in self.hooks.keys():41 logging.error(f'Unknown hook {hook_name}')42 return43 self.hooks[hook_name] = hook_func44 def _init_client(self, base_request, send):45 client = Client(base_request, send)46 self.clients[base_request.ws_id] = client47 return client48 def _remove_client(self, client):49 self.clients.pop(client.ws_id)50 def _make_request(self, base_request, message):51 try:52 parsed = json.loads(message['text'])53 except json.JSONDecodeError:54 raise ParseError55 if not validate_jrpc_request(parsed):56 raise InvalidRequest57 if parsed['method'] not in self.methods.keys():58 raise NotFound59 return base_request._make_request(parsed)60 async def _make_response(self, request, client):61 return await self.methods[request.method](request, client)62 def add_method(self, name, method):63 self.methods[name] = method64 def method(self, name):65 def wrapped(method):66 self.methods[name] = method67 return method68 return wrapped69 def add_user(self, client, user):70 client.user = user71 self.users[user.id] = client72 def remove_user(self, client):73 self.users.pop(client.user.id)74 def before(self, func):75 def decorator(method):76 async def wrapped(req, client):77 req, client = func(req, client)78 return await method(req, client)79 return wrapped80 return decorator81 def after(self, func):82 def decorator(method):83 async def wrapped(req, client):84 resp = await method(req, client)85 return func(req, resp, client)86 return wrapped87 return decorator88 async def __call__(self, scope, receive, send):89 if scope['type'] == 'lifespan':90 message = await receive()91 if message['type'] == 'lifespan.startup':92 self._run_hook('before_startup', scope, receive, send)93 await send({'type': 'lifespan.startup.complete'})94 self._run_hook('after_startup', scope, receive, send)95 return96 elif message['type'] == 'lifespan.shutdown':97 self._run_hook('before_shutdown', scope, receive, send)98 await send({'type': 'lifespan.shutdown.complete'})99 self._run_hook('after_shutdown', scope, receive, send)100 return101 elif scope['type'] == 'websocket':102 pass103 else:104 logging.error(f"Unsupported scope type - {scope['type']}")105 return106 message = await receive()107 if message['type'] == 'websocket.connect':108 self._run_hook('before_connection', scope, receive, send, message)109 else:110 return111 await send(jrpc_accept())112 self._run_hook('after_connection', scope, receive, send)113 base_request = Request(scope, receive, send)114 client = self._init_client(base_request, send)115 while True:116 message = await receive()117 if message['type'] in ('websocket.disconnect', 'websocket.close'):118 self._run_hook('on_disconnection', message, client)119 break120 elif message['type'] == 'websocket.receive':121 try:122 request = self._make_request(base_request, message)123 except CorvusException as e:124 await send(e.error())125 continue126 self._run_hook('on_request', request, client)127 try:128 response = await self._make_response(request, client)129 except CorvusException as e:130 await send(e.error())131 continue132 self._run_hook('on_response', response, client)133 if response:134 await send(jrpc_response(response, request.id))135 else:136 await send(UnknownAsgiAction.error())...

Full Screen

Full Screen

hooks.py

Source:hooks.py Github

copy

Full Screen

...46 if result:47 _LOGGER.debug("ouput of command %s:\n%s" % (command, output))48 return result, output49 return True, None50 def _run_hook(self, hook, node, *parameters):51 parameters = [ str(p) for p in list(parameters)]52 result, output = self._check_command_and_run(_CONFIGURATION_HOOKS.WORKING_FOLDER, hook, [ node ] + list(parameters))53 return result54 def pre_poweron(self, node, *parameters):55 return self._run_hook(_CONFIGURATION_HOOKS.PRE_POWERON, node, *parameters)56 def post_poweron(self, node, *parameters):57 return self._run_hook(_CONFIGURATION_HOOKS.POST_POWERON, node, *parameters)58 def pre_poweroff(self, node, *parameters):59 return self._run_hook(_CONFIGURATION_HOOKS.PRE_POWEROFF, node, *parameters)60 def post_poweroff(self, node, *parameters):61 return self._run_hook(_CONFIGURATION_HOOKS.POST_POWEROFF, node, *parameters)62 def poweredon(self, node, *parameters):63 return self._run_hook(_CONFIGURATION_HOOKS.POWEREDON, node, *parameters)64 def poweredoff(self, node, *parameters):65 return self._run_hook(_CONFIGURATION_HOOKS.POWEREDOFF, node, *parameters)66 def unexpected_poweron(self, node, *parameters):67 return self._run_hook(_CONFIGURATION_HOOKS.UNEXPECTED_POWERON, node, *parameters)68 def unexpected_poweroff(self, node, *parameters):69 return self._run_hook(_CONFIGURATION_HOOKS.UNEXPECTED_POWEROFF, node, *parameters)70 def onerr(self, node, *parameters):71 return self._run_hook(_CONFIGURATION_HOOKS.ONERR, node, *parameters)72 def offerr(self, node, *parameters):73 return self._run_hook(_CONFIGURATION_HOOKS.OFFERR, node, *parameters)74 def unknown(self, node, *parameters):75 return self._run_hook(_CONFIGURATION_HOOKS.UNKNOWN, node, *parameters)76 def idle(self, node, *parameters):77 return self._run_hook(_CONFIGURATION_HOOKS.IDLE, node, *parameters)78 def used(self, node, *parameters):79 return self._run_hook(_CONFIGURATION_HOOKS.USED, node, *parameters)80 def request(self, request):81 requests_string = ';'.join(request.resources.resources.requests)82 parameters = (request.resources.resources.slots, request.resources.resources.memory, request.resources.taskcount, request.resources.maxtaskspernode, requests_string)83 return self._run_hook(_CONFIGURATION_HOOKS.REQUEST, str(request.id), *parameters)84try:85 HOOKS86except:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run ATX automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful