How to use get_process method in autotest

Best Python code snippet using autotest_python

test_api.py

Source:test_api.py Github

copy

Full Screen

...11class TestAPI(object):12 def test_api_by_id(self):13 dpid = server.create_process("test_api_by_id", "sleep 60") # 创建进程14 assert dpid15 assert server.get_process(program_id=dpid)["status"] == STATUS.RUNNING16 assert server.stop_process(program_id=dpid) # 正常停止17 assert server.get_process(program_id=dpid)["status"] == STATUS.STOPPED18 assert server.stop_process(program_id=dpid) # 停止已停止的进程19 assert server.get_process(program_id=dpid)["status"] == STATUS.STOPPED20 assert server.restart_process(program_id=dpid) # 重启已停止的进程21 assert server.get_process(program_id=dpid)["status"] == STATUS.RUNNING22 assert server.start_process(program_id=dpid) # 启动已启动的进程23 assert server.get_process(program_id=dpid)["status"] == STATUS.RUNNING24 assert server.restart_process(program_id=dpid) # 正常重启25 assert server.get_process(program_id=dpid)["status"] == STATUS.RUNNING26 assert server.stop_process(program_id=dpid) # 正常停止27 assert server.get_process(program_id=dpid)["status"] == STATUS.STOPPED28 assert server.start_process(program_id=dpid) # 正常启动29 assert server.get_process(program_id=dpid)["status"] == STATUS.RUNNING30 assert server.stop_process(program_id=dpid) # 清理31 assert server.get_process(program_id=dpid)["status"] == STATUS.STOPPED32 def test_server_by_name(self):33 dpid = server.create_process("test_server_by_name",34 "sleep 60") # 创建进程35 assert dpid36 process = server.get_process(program_id=dpid)37 assert server.get_process(program_id=dpid)["status"] == STATUS.RUNNING38 assert server.stop_process(program_name=process["name"]) # 正常停止39 assert server.get_process(program_id=dpid)["status"] == STATUS.STOPPED40 assert server.stop_process(program_name=process["name"]) # 停止已停止的进程41 assert server.get_process(program_id=dpid)["status"] == STATUS.STOPPED42 assert server.restart_process(program_name=process["name"]) # 重启已停止的进程43 assert server.get_process(program_id=dpid)["status"] == STATUS.RUNNING44 assert server.start_process(program_name=process["name"]) # 启动已启动的进程45 assert server.get_process(program_id=dpid)["status"] == STATUS.RUNNING46 assert server.restart_process(program_name=process["name"]) # 正常重启47 assert server.get_process(program_id=dpid)["status"] == STATUS.RUNNING48 assert server.stop_process(program_name=process["name"]) # 正常停止49 assert server.get_process(program_id=dpid)["status"] == STATUS.STOPPED50 assert server.start_process(program_name=process["name"]) # 正常启动51 assert server.get_process(program_id=dpid)["status"] == STATUS.RUNNING52 assert server.stop_process(program_name=process["name"]) # 清理53 assert server.get_process(program_id=dpid)["status"] == STATUS.STOPPED54 def test_create_twice(self):55 dpid = server.create_process("test_create_twice",56 "sleep 60",57 auto_start=False) # 创建进程58 assert dpid59 dpid = server.create_process("test_create_twice",60 "sleep 60") # 创建进程61 assert dpid62 assert server.stop_process(program_id=dpid) # 清理63 def test_status(self):64 name = "test_status"65 command = "sleep 60"66 directory = "/tmp"67 environment = "A=a;B=b;C=c"68 auto_start = False69 auto_restart = True70 machines = '127.0.0.1;localhost'71 touch_timeout = 6072 stdout_logfile = '/tmp/stdout.log'73 stderr_logfile = '/tmp/stderr.log'74 max_fail_count = 1075 dpid = server.create_process(name, command,76 directory=directory,77 environment=environment,78 auto_start=auto_start,79 auto_restart=auto_restart,80 machines=machines,81 touch_timeout=touch_timeout,82 stdout_logfile=stdout_logfile,83 stderr_logfile=stderr_logfile,84 max_fail_count=max_fail_count)85 assert dpid86 process = server.get_process(program_id=dpid)87 assert process["id"] == dpid88 assert process["name"] == name89 assert process["command"] == command90 assert process["directory"] == directory91 assert process["environment"] == environment92 assert process["auto_start"] == auto_start93 assert process["auto_restart"] == auto_restart94 assert process["machines"] == machines95 assert process["touch_timeout"] == touch_timeout96 assert process["stdout_logfile"] == stdout_logfile97 assert process["stderr_logfile"] == stderr_logfile98 assert process["max_fail_count"] == max_fail_count99 assert server.stop_process(program_id=dpid) # 清理100 def test_directory(self):101 name = "test_directory"102 command = "test `pwd` = %s && sleep 60" % tmp_for_test103 dpid = server.create_process(name, command,104 directory=tmp_for_test,105 stdout_logfile="test_directory.log",106 stderr_logfile="test_directory.log")107 assert dpid108 assert server.stop_process(program_id=dpid) # 清理109 def test_environment(self):110 name = "test_environment"111 environment = "AAA=aaa; BBB = bbb"112 command = "test $AAA = aaa && test $BBB = bbb && sleep 60"113 dpid = server.create_process(name, command,114 directory=tmp_for_test,115 environment=environment,116 stdout_logfile="test_environment.log",117 stderr_logfile="test_environment.log")118 assert dpid119 assert server.stop_process(program_id=dpid) # 清理120 def test_logfile(self):121 name = "test_logfile"122 command = "echo stdout && echo stderr 1>&2 && sleep 60"123 dpid = server.create_process(name, command,124 directory=tmp_for_test,125 stdout_logfile="test_logfile_stdout.log",126 stderr_logfile="test_logfile_stderr.log")127 assert dpid128 assert server.stop_process(program_id=dpid) # 清理129 with open(os.path.join(tmp_for_test, "test_logfile_stdout.log"),130 encoding="utf-8") as fp:131 for line in fp:132 if line.strip() == "stdout":133 break134 else:135 assert False136 with open(os.path.join(tmp_for_test, "test_logfile_stderr.log"),137 encoding="utf-8") as fp:138 for line in fp:139 if line.strip() == "stderr":140 break141 else:142 assert False143 def test_machines(self):144 name = "test_machines"145 command = "sleep 60"146 machines = "localhost;127.0.0.1"147 dpid = server.create_process(name, command,148 directory=tmp_for_test,149 machines=machines,150 stdout_logfile="test_machines.log",151 stderr_logfile="test_machines.log")152 assert dpid153 assert server.stop_process(program_id=dpid) # 清理154 name = "test_machines"155 command = "sleep 60"156 machines = "wrong_machine"157 dpid = server.create_process(name, command,158 directory=tmp_for_test,159 machines=machines,160 stdout_logfile="test_machines.log",161 stderr_logfile="test_machines.log")162 assert dpid == ""163 def test_auto_start(self):164 name = "test_auto_start"165 command = "sleep 60"166 dpid = server.create_process(name, command,167 auto_start=False,168 directory=tmp_for_test,169 stdout_logfile="test_auto_start.log",170 stderr_logfile="test_auto_start.log")171 assert dpid172 assert server.get_process(program_id=dpid)["status"] == STATUS.STOPPED173 def test_auto_restart(self):174 pass175 def test_touch_timeout(self):176 pass177 def test_max_fail_count(self):...

Full Screen

Full Screen

test.py

Source:test.py Github

copy

Full Screen

...60 processor = ProcessRunner(processes, ProcessorType.FCFS)61 62 processor.run()63 print("PROCESSOR FCFS GANT CHART: "+str(processor.gant_chart))64 self.assertEqual(processor.get_process(1).completion_time, 17)65 self.assertEqual(processor.get_process(1).turn_around_time, 8)66 self.assertEqual(processor.get_process(1).waiting_time, 0)67 self.assertEqual(processor.get_process(2).completion_time, 5)68 self.assertEqual(processor.get_process(2).turn_around_time, 5)69 self.assertEqual(processor.get_process(2).waiting_time, 0)70 self.assertEqual(processor.get_process(3).completion_time, 8)71 self.assertEqual(processor.get_process(3).turn_around_time, 8)72 self.assertEqual(processor.get_process(3).waiting_time, 5)73 self.assertEqual(processor.get_process(4).completion_time, 23)74 self.assertEqual(processor.get_process(4).turn_around_time, 13)75 self.assertEqual(processor.get_process(4).waiting_time, 7)76 self.assertEqual(len(processor.gant_chart), 23)77 def test_rr(self):78 processes: list[Process] = getProcesses("processinfo.csv")79 processor = ProcessRunner(processes, ProcessorType.RR, 2)80 81 processor.run()82 print("PROCESSOR RR GANT CHART: "+str(processor.gant_chart))83 # pid 384 self.assertEqual(processor.get_process(3).completion_time, 7)85 self.assertEqual(processor.get_process(3).turn_around_time, 7)86 self.assertEqual(processor.get_process(3).waiting_time, 4)87 # pid 288 self.assertEqual(processor.get_process(2).completion_time, 8)89 self.assertEqual(processor.get_process(2).turn_around_time, 8)90 self.assertEqual(processor.get_process(2).waiting_time, 3)91 92 # pid 193 self.assertEqual(processor.get_process(1).completion_time, 23)94 self.assertEqual(processor.get_process(1).turn_around_time, 14)95 self.assertEqual(processor.get_process(1).waiting_time, 6)96 # pid 497 self.assertEqual(processor.get_process(4).completion_time, 21)98 self.assertEqual(processor.get_process(4).turn_around_time, 11)99 self.assertEqual(processor.get_process(4).waiting_time, 5)100 self.assertEqual(len(processor.gant_chart), 23)101 def test_ps(self):102 processes: list[Process] = getProcesses("processinfo.csv")103 processor = ProcessRunner(processes, ProcessorType.PS)104 105 processor.run()106 print("PROCESSOR PS GANT CHART: "+str(processor.gant_chart))107 # pid 1108 self.assertEqual(processor.get_process(1).completion_time, 17)109 self.assertEqual(processor.get_process(1).turn_around_time, 8)110 self.assertEqual(processor.get_process(1).waiting_time, 0)111 # pid 2112 self.assertEqual(processor.get_process(2).completion_time, 8)113 self.assertEqual(processor.get_process(2).turn_around_time, 8)114 self.assertEqual(processor.get_process(2).waiting_time, 3)115 # pid 3116 self.assertEqual(processor.get_process(3).completion_time, 3)117 self.assertEqual(processor.get_process(3).turn_around_time, 3)118 self.assertEqual(processor.get_process(3).waiting_time, 0)119 120 # pid 4121 self.assertEqual(processor.get_process(4).completion_time, 23)122 self.assertEqual(processor.get_process(4).turn_around_time, 13)123 self.assertEqual(processor.get_process(4).waiting_time, 7)124 self.assertEqual(len(processor.gant_chart), 23)125 def test_fcfs_2(self):126 processes: list[Process] = getProcesses("processinfotest.csv")127 processor = ProcessRunner(processes, ProcessorType.FCFS)128 129 processor.run()130 print("PROCESSOR FCFS 2 GANT CHART: "+str(processor.gant_chart))131 # pid 1132 self.assertEqual(processor.get_process(1).completion_time, 10)133 self.assertEqual(processor.get_process(1).turn_around_time, 10)134 self.assertEqual(processor.get_process(1).waiting_time, 0)135 # pid 2136 self.assertEqual(processor.get_process(2).completion_time, 15)137 self.assertEqual(processor.get_process(2).turn_around_time, 15)138 self.assertEqual(processor.get_process(2).waiting_time, 10)139 # pid 3140 self.assertEqual(processor.get_process(3).completion_time, 23)141 self.assertEqual(processor.get_process(3).turn_around_time, 23)142 self.assertEqual(processor.get_process(3).waiting_time, 15)143 self.assertEqual(processor.average_wt(),8+(1.0/3.0))144 self.assertEqual(processor.average_ttt(),16)145 146if __name__ == '__main__':147 unittest.main()148 ...

Full Screen

Full Screen

ks.py

Source:ks.py Github

copy

Full Screen

...3import os, io, time4CURRENT_USER = os.getlogin()5UWSGI_USER = CURRENT_USER # Linux 上uwsgi代码所在的用户6NGINX_USER = CURRENT_USER # Linux 上nginx代码所在的用户7def get_process(name, filter, user=None):8 #@param:name:进程名9 #@param:user:系统用户名10 if user:###针对多用户的情况,增加用户的查询11 cmd = 'ps -ef|grep ' + name + '|grep ' + user12 else:13 cmd = 'ps -ef|grep ' + name14 #print ('检测进程启动与否:%s'%( cmd ))15 r = os.popen(cmd)16 s = io.StringIO(r.read())17 pid = None18 for l in s:19 if filter(l):20 return filter(l)21def nginx(x):22 if 'nginx:' in x:23 d = x.split()24 if d[2] == '1':25 return d[1]26 else:27 return d[2]28def uwsgi(x):29 if 'wsgi.xml' in x:30 d = x.split()31 if d[2] == '1':32 return d[1]33 else:34 return d[2]35def start():36 print('启动 nginx ...')37 if get_process('nginx', nginx, user=NGINX_USER):38 print('nginx 已经启动')39 else:40 os.system(41 '%s/apps/nginx/sbin/nginx -c %s/apps/nginx/conf/nginx.conf' % (os.environ['HOME'], os.environ['HOME']))42 print('nginx 启动完毕')43 print('启动 web服务')44 if get_process('uwsgi', uwsgi, user=UWSGI_USER):45 print('web服务已经启动')46 else:47 os.system('uwsgi -x %s/src/ywt_flask/etc/uwsgi.xml' % (os.environ['HOME']))48 print('web服务 启动完毕')49def stop():50 pid = get_process('uwsgi', uwsgi, user=UWSGI_USER)51 if pid:52 print('uwsgi pid ' + pid)53 os.system('kill -9 ' + pid)54 print('web服务关闭成功')55 else:56 print('web服务未启动,不需关闭')57def stopall():58 stop()59 pid = get_process('nginx', nginx, user=NGINX_USER)60 if pid:61 print('nginx pid ' + pid)62 os.system('kill -15 ' + pid)63 print('nginx 关闭成功')64 else:65 print('nginx未启动,不需要关闭')66def restart():67 stop()68 time.sleep(2)69 start()70def restartall():71 stopall()72 time.sleep(2)73 start()74# 判断uwsgi是否启动,没有启动则重启一次75#*/2 * * * * /home/hxkh/src/backplat/crontab/runpy.sh ks.py isalive76# runpy.sh 应该有可执行权限77# 代码含义:每2分钟的时间间隔自动执行命令: ks.py isalive78def isalive():79 pid = get_process('uwsgi', uwsgi, user=UWSGI_USER)80 if pid:81 print('web服务已启动,不需要重启')82 else:83 print('web服务未启动,尝试重启')84 restart()85if __name__ == '__main__':86 import sys87 n = sys.argv[-1]88 if n.lower() == 'start':89 start()90 elif n.lower() == 'stop':91 stop()92 elif n.lower() == 'stopall':93 stopall()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful