How to use interrupted_task method in locust

Best Python code snippet using locust

test_locust_class.py

Source:test_locust_class.py Github

copy

Full Screen

...220 221 def test_interrupt_taskset_in_main_taskset(self):222 class MyTaskSet(TaskSet):223 @task224 def interrupted_task(self):225 raise InterruptTaskSet(reschedule=False)226 class MyLocust(Locust):227 host = "http://127.0.0.1"228 task_set = MyTaskSet229 230 class MyTaskSet2(TaskSet):231 @task232 def interrupted_task(self):233 self.interrupt()234 class MyLocust2(Locust):235 host = "http://127.0.0.1"236 task_set = MyTaskSet2237 238 l = MyLocust()239 l2 = MyLocust2()240 self.assertRaises(LocustError, lambda: l.run())241 self.assertRaises(LocustError, lambda: l2.run())242 243 try:244 l.run()245 except LocustError as e:246 self.assertTrue("MyLocust" in e.args[0], "MyLocust should have been referred to in the exception message")247 self.assertTrue("MyTaskSet" in e.args[0], "MyTaskSet should have been referred to in the exception message")248 except:249 raise250 251 try:252 l2.run()253 except LocustError as e:254 self.assertTrue("MyLocust2" in e.args[0], "MyLocust2 should have been referred to in the exception message")255 self.assertTrue("MyTaskSet2" in e.args[0], "MyTaskSet2 should have been referred to in the exception message")256 except:257 raise258 259 def test_on_start_interrupt(self):260 class SubTaskSet(TaskSet):261 def on_start(self):262 if self.kwargs["reschedule"]:263 self.interrupt(reschedule=True)264 else:265 self.interrupt(reschedule=False)266 267 class MyLocust(Locust):268 host = ""269 task_set = SubTaskSet270 271 l = MyLocust()272 task_set = SubTaskSet(l)273 self.assertRaises(RescheduleTaskImmediately, lambda: task_set.run(reschedule=True))274 self.assertRaises(RescheduleTask, lambda: task_set.run(reschedule=False))275 276 def test_parent_attribute(self):277 from locust.exception import StopLocust278 parents = {}279 280 class SubTaskSet(TaskSet):281 def on_start(self):282 parents["sub"] = self.parent283 284 @task285 class SubSubTaskSet(TaskSet):286 def on_start(self):287 parents["subsub"] = self.parent288 @task289 def stop(self):290 raise StopLocust()291 class RootTaskSet(TaskSet):292 tasks = [SubTaskSet]293 294 class MyLocust(Locust):295 host = ""296 task_set = RootTaskSet297 298 l = MyLocust()299 l.run()300 self.assertTrue(isinstance(parents["sub"], RootTaskSet))301 self.assertTrue(isinstance(parents["subsub"], SubTaskSet))302 303class TestWebLocustClass(WebserverTestCase):304 def test_get_request(self):305 self.response = ""306 def t1(l):307 self.response = l.client.get("/ultra_fast")308 class MyLocust(HttpLocust):309 tasks = [t1]310 host = "http://127.0.0.1:%i" % self.port311 my_locust = MyLocust()312 t1(my_locust)313 self.assertEqual(self.response.text, "This is an ultra fast response")314 def test_client_request_headers(self):315 class MyLocust(HttpLocust):316 host = "http://127.0.0.1:%i" % self.port317 locust = MyLocust()318 self.assertEqual("hello", locust.client.get("/request_header_test", headers={"X-Header-Test":"hello"}).text)319 def test_client_get(self):320 class MyLocust(HttpLocust):321 host = "http://127.0.0.1:%i" % self.port322 locust = MyLocust()323 self.assertEqual("GET", locust.client.get("/request_method").text)324 325 def test_client_get_absolute_url(self):326 class MyLocust(HttpLocust):327 host = "http://127.0.0.1:%i" % self.port328 locust = MyLocust()329 self.assertEqual("GET", locust.client.get("http://127.0.0.1:%i/request_method" % self.port).text)330 def test_client_post(self):331 class MyLocust(HttpLocust):332 host = "http://127.0.0.1:%i" % self.port333 locust = MyLocust()334 self.assertEqual("POST", locust.client.post("/request_method", {"arg":"hello world"}).text)335 self.assertEqual("hello world", locust.client.post("/post", {"arg":"hello world"}).text)336 def test_client_put(self):337 class MyLocust(HttpLocust):338 host = "http://127.0.0.1:%i" % self.port339 locust = MyLocust()340 self.assertEqual("PUT", locust.client.put("/request_method", {"arg":"hello world"}).text)341 self.assertEqual("hello world", locust.client.put("/put", {"arg":"hello world"}).text)342 def test_client_delete(self):343 class MyLocust(HttpLocust):344 host = "http://127.0.0.1:%i" % self.port345 locust = MyLocust()346 self.assertEqual("DELETE", locust.client.delete("/request_method").text)347 self.assertEqual(200, locust.client.delete("/request_method").status_code)348 def test_client_head(self):349 class MyLocust(HttpLocust):350 host = "http://127.0.0.1:%i" % self.port351 locust = MyLocust()352 self.assertEqual(200, locust.client.head("/request_method").status_code)353 def test_client_basic_auth(self):354 class MyLocust(HttpLocust):355 host = "http://127.0.0.1:%i" % self.port356 class MyAuthorizedLocust(HttpLocust):357 host = "http://locust:menace@127.0.0.1:%i" % self.port358 class MyUnauthorizedLocust(HttpLocust):359 host = "http://locust:wrong@127.0.0.1:%i" % self.port360 locust = MyLocust()361 unauthorized = MyUnauthorizedLocust()362 authorized = MyAuthorizedLocust()363 response = authorized.client.get("/basic_auth")364 self.assertEqual(200, response.status_code)365 self.assertEqual("Authorized", response.text)366 self.assertEqual(401, locust.client.get("/basic_auth").status_code)367 self.assertEqual(401, unauthorized.client.get("/basic_auth").status_code)368 369 def test_log_request_name_argument(self):370 from locust.stats import global_stats371 self.response = ""372 373 class MyLocust(HttpLocust):374 tasks = []375 host = "http://127.0.0.1:%i" % self.port376 377 @task()378 def t1(l):379 self.response = l.client.get("/ultra_fast", name="new name!")380 my_locust = MyLocust()381 my_locust.t1()382 383 self.assertEqual(1, global_stats.get("new name!", "GET").num_requests)384 self.assertEqual(0, global_stats.get("/ultra_fast", "GET").num_requests)385 386 def test_locust_client_error(self):387 class MyTaskSet(TaskSet):388 @task389 def t1(self):390 self.client.get("/")391 self.interrupt()392 393 class MyLocust(Locust):394 host = "http://127.0.0.1:%i" % self.port395 task_set = MyTaskSet396 397 my_locust = MyLocust()398 self.assertRaises(LocustError, lambda: my_locust.client.get("/"))399 my_taskset = MyTaskSet(my_locust)400 self.assertRaises(LocustError, lambda: my_taskset.client.get("/"))401 402 def test_redirect_url_original_path_as_name(self):403 class MyLocust(HttpLocust):404 host = "http://127.0.0.1:%i" % self.port405 l = MyLocust()406 l.client.get("/redirect")407 408 from locust.stats import global_stats409 self.assertEqual(1, len(global_stats.entries))410 self.assertEqual(1, global_stats.get("/redirect", "GET").num_requests)411 self.assertEqual(0, global_stats.get("/ultra_fast", "GET").num_requests)412class TestCatchResponse(WebserverTestCase):413 def setUp(self):414 super(TestCatchResponse, self).setUp()415 416 class MyLocust(HttpLocust):417 host = "http://127.0.0.1:%i" % self.port418 self.locust = MyLocust()419 420 self.num_failures = 0421 self.num_success = 0422 def on_failure(request_type, name, response_time, exception):423 self.num_failures += 1424 self.last_failure_exception = exception425 def on_success(**kwargs):426 self.num_success += 1427 events.request_failure += on_failure428 events.request_success += on_success429 430 def test_catch_response(self):431 self.assertEqual(500, self.locust.client.get("/fail").status_code)432 self.assertEqual(1, self.num_failures)433 self.assertEqual(0, self.num_success)434 435 with self.locust.client.get("/ultra_fast", catch_response=True) as response: pass436 self.assertEqual(1, self.num_failures)437 self.assertEqual(1, self.num_success)438 439 with self.locust.client.get("/ultra_fast", catch_response=True) as response:440 raise ResponseError("Not working")441 442 self.assertEqual(2, self.num_failures)443 self.assertEqual(1, self.num_success)444 445 def test_catch_response_http_fail(self):446 with self.locust.client.get("/fail", catch_response=True) as response: pass447 self.assertEqual(1, self.num_failures)448 self.assertEqual(0, self.num_success)449 450 def test_catch_response_http_manual_fail(self):451 with self.locust.client.get("/ultra_fast", catch_response=True) as response:452 response.failure("Haha!")453 self.assertEqual(1, self.num_failures)454 self.assertEqual(0, self.num_success)455 self.assertTrue(456 isinstance(self.last_failure_exception, CatchResponseError),457 "Failure event handler should have been passed a CatchResponseError instance"458 )459 460 def test_catch_response_http_manual_success(self):461 with self.locust.client.get("/fail", catch_response=True) as response:462 response.success()463 self.assertEqual(0, self.num_failures)464 self.assertEqual(1, self.num_success)465 466 def test_catch_response_allow_404(self):467 with self.locust.client.get("/does/not/exist", catch_response=True) as response:468 self.assertEqual(404, response.status_code)469 if response.status_code == 404:470 response.success()471 self.assertEqual(0, self.num_failures)472 self.assertEqual(1, self.num_success)473 474 def test_interrupt_taskset_with_catch_response(self):475 class MyTaskSet(TaskSet):476 @task477 def interrupted_task(self):478 with self.client.get("/ultra_fast", catch_response=True) as r:479 raise InterruptTaskSet()480 class MyLocust(HttpLocust):481 host = "http://127.0.0.1:%i" % self.port482 task_set = MyTaskSet483 484 l = MyLocust()485 ts = MyTaskSet(l)486 self.assertRaises(InterruptTaskSet, lambda: ts.interrupted_task())487 self.assertEqual(0, self.num_failures)488 self.assertEqual(0, self.num_success)489 490 def test_catch_response_connection_error_success(self):491 class MyLocust(HttpLocust):492 host = "http://127.0.0.1:1"493 l = MyLocust()494 with l.client.get("/", catch_response=True) as r:495 self.assertEqual(r.status_code, 0)496 self.assertEqual(None, r.content)497 r.success()498 self.assertEqual(1, self.num_success)499 self.assertEqual(0, self.num_failures)500 ...

Full Screen

Full Screen

c_TimekeepingTools.py

Source:c_TimekeepingTools.py Github

copy

Full Screen

1import random2import numpy as np3from datetime import datetime4import os5import pickle6import string7class Timekeeper:8 def __init__(self):9 self.projects = {}10 def getProjectIDs(self):11 # Return the project IDs as a set of keys12 return self.projects.keys()13 def addProject(self, project_id=None, project_name=None):14 # If the project has a specified ID15 if project_id is not None:16 # Check if the project id already exists17 if project_id in self.getProjectIDs():18 # Reference the project and return it19 return self.projects[project_id]20 # Otherwise21 else:22 # Create a new project23 currProject = Project(self, name=project_name)24 # File the project under its ID25 self.projects[currProject.id] = currProject26 # Return it27 return currProject28 def get_new_project_id(self):29 # Unique id switch30 unique_id = False31 # While the ID is not unique32 while unique_id is False:33 project_id = ''34 while len(project_id) < 6:35 project_id += (random.choice([random.choice(string.ascii_uppercase), str(random.randint(0, 9))]))36 # If the project id is not the id keys37 if project_id not in self.getProjectIDs():38 # Switch to unique39 unique_id = True40 # Return the project id41 return project_id42class Feeling:43 def __init__(self, feeling_type, feeling_intensity):44 self.creation_date = datetime.now()45 self.feeling_type = feeling_type46 self.feeling_intensity = feeling_intensity47class Project:48 def __init__(self, timekeeper, name=None, project_id=None):49 self.timekeeper = timekeeper50 self.creation_date = datetime.now()51 self.id = project_id52 self.name = name53 self.tasks = {}54 def get_task_names(self):55 # Task name list56 task_names = []57 # For each task58 for task in self.tasks:59 # Append name to task list60 pass61class Task:62 def __init__(self, project, name=None, task_id=None):63 self.project = project64 self.timekeeper = project.timekeeper65 self.creation_date = None66 self.id = task_id67 self.name = name68 self.instigators = []69 self.originator = None70 self.efforts = []71 self.tags = []72 self.feelings = []73class Person:74 def __init__(self):75 self.name = None76 self.id = None77 self.tasks = []78 self.projects = []79class Effort:80 def __init__(self, task):81 self.project = task.project82 self.task = task83 self.timekeeper = task.project.timekeeper84 self.date = None85 self.start_time = None86 self.end_time = None87 self.duration = None88 self.notes = None89 self.instigator = None90 self.instigation = None91class Interruption:92 def __init__(self, interrupted_task, interrupting_task):93 self.interrupted_task = interrupted_task94 self.interrupting_task = interrupting_task95 self.datetime = datetime.now()96# Check if there is a pickled TimeKeeper in the Files directory97def checkForSavedTK():98 # Change working directory to the Files directory99 os.chdir("F:/USRA/WorkTools/venv/Files/")100 # If there is a saved timekeeper file in the Files101 if os.path.exists(f"{os.getcwd()}TKPickle"):102 # Return true103 return True104 # Otherwise (no saved TK)105 else:106 # Return False107 return False108def main():109 # If there is a pickled TK110 if checkForSavedTK() is True:111 # Open it and unpickle112 currTK = pickle.load(open(f"{os.getcwd()}TKPickle", 'rb'))113 # Otherwise (no pickled TK)114 else:115 # Create a new TK object116 currTK = Timekeeper()117if __name__ == "__main__":...

Full Screen

Full Screen

pendsv.py

Source:pendsv.py Github

copy

Full Screen

...4 if message.split()[0] == "No":5 return message6 else:7 return message.split()[0]8def interrupted_task(big_stack=False):9 psp = gdb.parse_and_eval("$psp")10 11 if big_stack:12 stacked_pc = psp + 14*413 else:14 stacked_pc = psp + 6*415 pc = stacked_pc.cast(gdb.lookup_type("uint32_t").pointer()).dereference()16 function = lookup_symbol(pc)17 curr_task = gdb.parse_and_eval("curr_task")18 task_name = lookup_symbol(curr_task["task"]["fptr"].cast(gdb.lookup_type("uint32_t")))19 return (curr_task["task"], task_name, function)20def break_handler(event):21 if type(event) == gdb.StopEvent:22 return23 if event.breakpoints[0] == swap:24 interrupted = interrupted_task(big_stack=True)25 else:26 interrupted = interrupted_task()27 print "'%s()' interrupted while in '%s()'" % (interrupted[1], interrupted[2])28tick = gdb.Breakpoint("pendsv_handler")29swap = gdb.Breakpoint("context.c:160")30svc = gdb.Breakpoint("svc_handler")31# Run forever32gdb.execute("set height 0")33# Initialize breakpoint handler34gdb.events.stop.connect(break_handler)35while True:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run locust automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful