How to use processcommand method in tox

Best Python code snippet using tox_python

checkIfRunNeedsProcessing.py

Source:checkIfRunNeedsProcessing.py Github

copy

Full Screen

1#!/usr/bin/env python2"""3This script should check if a telem run needs processing and if it does do the appropriate processing.4"""5import sys6import os7import subprocess8anitaTelemDir = os.getenv('ANITA_TELEM_DIR', "/home/radio/anita14/anitaTelem")9anitaTelemDataDir = os.getenv('ANITA_TELEM_DATA_DIR', "/anitaStorage2/antarctica14/telem")10site="antarctica"11def doesRawDirExist(runNum):12 rawDir=anitaTelemDataDir+"/raw/run"+str(runNum)13 if(os.path.isdir(rawDir)):14 return 115 return 016def doesRootDirExist(runNum):17 rawDir=anitaTelemDataDir+"/root/run"+str(runNum)18 if(os.path.isdir(rawDir)):19 return 120 return 021def getRawTimeModified(runNum,filePath):22 fullDir=anitaTelemDataDir+"/raw/run"+str(runNum)+"/"+filePath23 if(os.path.isfile(fullDir)):24 return os.path.getmtime(fullDir)25 return 0;26def getRootFilename(runNum,fileName):27 return anitaTelemDataDir+"/root/run"+str(runNum)+"/"+fileName+str(runNum)+".root"28def getRootTimeModified(runNum,fileName): 29 rootFile=getRootFilename(runNum,fileName)30 if(os.path.isfile(rootFile)):31 return os.path.getmtime(rootFile)32 return 133def main():34 35 if(len(sys.argv)<2):36 print __file__+' <run>'37 sys.stdout.flush()38 sys.exit()39 runNum=sys.argv[1]40 41#Step one check raw dir exists42 if not doesRawDirExist(runNum):43 print "Raw dir does not exist for run ",runNum," <- nothing to do ... exitting script"44 sys.stdout.flush()45 sys.exit()46#Step two check root dir exits47 if not doesRootDirExist(runNum):48 print "Root dir does not exist for run ",runNum," <- generating root files"49 sys.stdout.flush()50 processCommand="runAnita4FileMakerTelem.sh "+str(runNum)51 print "--> [1 of 2] - running ",processCommand52 sys.stdout.flush()53 subprocess.call([processCommand],shell=True)54 processCommand="processRunTelem.sh "+str(runNum)55 print "--> [1 of 2] - running ",processCommand56 sys.stdout.flush()57 subprocess.call([processCommand],shell=True)58 print "--> root dir generated ... exitting script" 59 sys.stdout.flush()60 sys.exit()61#Step three check if individual root files need to be remade62 rawDir=anitaTelemDataDir+"/raw/run"+str(runNum) 63 rootDir=anitaTelemDataDir+"/root/run"+str(runNum)64 65 rawTime=getRawTimeModified(runNum,"event/last")66 rootTime=getRootTimeModified(runNum,"eventFile")67 if(rawTime>rootTime):68 print "Need new event ROOT file for run ",runNum," - rawTime = ",rawTime," / rootTime = ",rootTime," <- from check on event/last"69 sys.stdout.flush()70 processCommand="runTelemEventMaker.sh "+str(runNum)+" "+rawDir+" "+rootDir71 print "--> [1 of 4] - running ",processCommand72 sys.stdout.flush()73 subprocess.call([processCommand],shell=True)74 processCommand="runTelemHeaderMaker.sh "+str(runNum)+" "+rawDir+" "+rootDir75 print "--> [2 of 4] - running ",processCommand76 sys.stdout.flush()77 subprocess.call([processCommand],shell=True)78 #Here insert call to aware file maker 79 processCommand="makeHeaderJsonFiles"80 print "--> [3 of 4] - running ",processCommand81 sys.stdout.flush()82 subprocess.call([processCommand,getRootFilename(runNum,"headFile")])83 #Now make the waveform summary files84 processCommand="makeWaveformSummaryJsonFiles"85 print "--> [4 of 4] - running ",processCommand86 sys.stdout.flush()87 subprocess.call([processCommand,getRootFilename(runNum,"eventHeadFile"),getRootFilename(runNum,"eventFile")])88 else:89 print "event ROOT file for run ",runNum," up-to-date - rawTime = ",rawTime," / rootTime = ",rootTime," <- from check on event/last"90 sys.stdout.flush()91 92 rawTime=getRawTimeModified(runNum,"house/hk/last")93 rootTime=getRootTimeModified(runNum,"hkFile")94 if(rawTime>rootTime):95 print "Need new hk ROOT file for run ",runNum," - rawTime = ",rawTime," / rootTime = ",rootTime," <- from check on house/hk/last"96 sys.stdout.flush()97 processCommand="runTelemHkMaker.sh "+str(runNum)+" "+rawDir+" "+rootDir98 print "--> [1 of 3] - running ",processCommand99 sys.stdout.flush()100 subprocess.call([processCommand],shell=True)101 #Here insert call to aware file maker102 processCommand="makeHkJsonFiles"103 print "--> [2 of 3] - running ",processCommand104 sys.stdout.flush()105 subprocess.call([processCommand,getRootFilename(runNum,"hkFile")])106 #Here insert call to aware file maker107 processCommand="makeSSHkJsonFiles"108 print "--> [3 of 3] - running ",processCommand109 sys.stdout.flush()110 subprocess.call([processCommand,getRootFilename(runNum,"sshkFile")])111 else:112 print "hk ROOT file for run ",runNum," up-to-date - rawTime = ",rawTime," / rootTime = ",rootTime," <- from check on house/hk/last"113 sys.stdout.flush()114 rawTime=getRawTimeModified(runNum,"house/monitor/last")115 rootTime=getRootTimeModified(runNum,"monitorFile")116 if(rawTime>rootTime):117 print "Need new monitor ROOT file for run ",runNum," - rawTime = ",rawTime," / rootTime = ",rootTime," <- from check on house/monitor/last"118 sys.stdout.flush()119 processCommand="runTelemMonitorMaker.sh "+str(runNum)+" "+rawDir+" "+rootDir120 print "--> [1 of 2] - running ",processCommand121 sys.stdout.flush()122 subprocess.call([processCommand],shell=True)123 processCommand="runTelemOtherMonitorMaker.sh "+str(runNum)+" "+rawDir+" "+rootDir124 print "--> [2 of 4] - running ",processCommand125 sys.stdout.flush()126 subprocess.call([processCommand],shell=True)127 #Here insert call to aware file maker128 processCommand="makeMonitorHkJsonFiles"129 print "--> [3 of 4] - running ",processCommand130 sys.stdout.flush()131 subprocess.call([processCommand,getRootFilename(runNum,"monitorFile")])132 processCommand="makeOtherMonitorHkJsonFiles"133 print "--> [4 of 4] - running ",processCommand134 sys.stdout.flush()135 subprocess.call([processCommand,getRootFilename(runNum,"monitorFile")])136 else:137 print "monitor ROOT file for run ",runNum," up-to-date - rawTime = ",rawTime," / rootTime = ",rootTime," <- from check on house/monitor/last"138 sys.stdout.flush()139 rawTime=getRawTimeModified(runNum,"house/gpu/last")140 rootTime=getRootTimeModified(runNum,"gpuFile")141 if(rawTime>rootTime):142 print "Need new gpu ROOT file for run ",runNum," - rawTime = ",rawTime," / rootTime = ",rootTime," <- from check on house/gpu/last"143 sys.stdout.flush()144 processCommand="runTelemGpuMaker.sh "+str(runNum)+" "+rawDir+" "+rootDir145 print "--> [1 of 2] - running ",processCommand146 sys.stdout.flush()147 subprocess.call([processCommand],shell=True)148 #Here insert call to aware file maker149 processCommand="makeGpuPowerSpectrumImages"150 print "--> [2 of 2] - running ",processCommand151 sys.stdout.flush()152 subprocess.call([processCommand,getRootFilename(runNum,"gpuFile")])153 else:154 print "gpu ROOT file for run ",runNum," up-to-date - rawTime = ",rawTime," / rootTime = ",rootTime," <- from check on house/gpu/last"155 sys.stdout.flush()156 rawTime=getRawTimeModified(runNum,"house/tuff/last")157 rootTime=getRootTimeModified(runNum,"tuffStatusFile")158 if(rawTime>rootTime):159 print "Need new tuff ROOT file for run ",runNum," - rawTime = ",rawTime," / rootTime = ",rootTime," <- from check on house/tuff/last"160 sys.stdout.flush()161 processCommand="runTelemTuffStatusMaker.sh "+str(runNum)+" "+rawDir+" "+rootDir162 print "--> [1 of 2] - running ",processCommand163 sys.stdout.flush()164 subprocess.call([processCommand],shell=True)165 #Here insert call to aware file maker166 processCommand="makeTuffStatusJsonFiles"167 print "--> [2 of 2] - running ",processCommand168 sys.stdout.flush()169 subprocess.call([processCommand,getRootFilename(runNum,"tuffStatusFile")])170 else:171 print "tuff ROOT file for run ",runNum," up-to-date - rawTime = ",rawTime," / rootTime = ",rootTime," <- from check on house/tuff/last"172 sys.stdout.flush()173 rawTime=getRawTimeModified(runNum,"house/turfhk/last")174 rootTime=getRootTimeModified(runNum,"turfRateFile")175 if(rawTime>rootTime):176 print "Need new monitor ROOT file for run ",runNum," - rawTime = ",rawTime," / rootTime = ",rootTime," <- from check on house/turfhk/last"177 sys.stdout.flush()178 processCommand="runTelemTurfRateMaker.sh "+str(runNum)+" "+rawDir+" "+rootDir179 print "--> [1 of 2] - running ",processCommand180 sys.stdout.flush()181 subprocess.call([processCommand],shell=True)182 processCommand="makeTurfRateJsonFiles"183 print "--> [2 of 2] - running ",processCommand184 sys.stdout.flush()185 subprocess.call([processCommand,getRootFilename(runNum,"turfRateFile")])186 else:187 print "Turf Rate ROOT file for run ",runNum," up-to-date - rawTime = ",rawTime," / rootTime = ",rootTime," <- from check on house/turfhk/last"188 sys.stdout.flush()189 190 rawTime=getRawTimeModified(runNum,"house/turfhk/last")191 rootTime=getRootTimeModified(runNum,"sumTurfRateFile")192 if(rawTime>rootTime):193 print "Need new monitor ROOT file for run ",runNum," - rawTime = ",rawTime," / rootTime = ",rootTime," <- from check on house/turfhk/last"194 sys.stdout.flush()195 processCommand="runTelemSumTurfRateMaker.sh "+str(runNum)+" "+rawDir+" "+rootDir196 print "--> [1 of 2] - running ",processCommand197 sys.stdout.flush()198 subprocess.call([processCommand],shell=True)199 processCommand="makeSumTurfRateJsonFiles"200 print "--> [2 of 2] - running ",processCommand201 sys.stdout.flush()202 subprocess.call([processCommand,getRootFilename(runNum,"sumTurfRateFile")])203 else:204 print "SummedTurfRate ROOT file for run ",runNum," up-to-date - rawTime = ",rawTime," / rootTime = ",rootTime," <- from check on house/turfhk/last"205 sys.stdout.flush()206 rawTime=getRawTimeModified(runNum,"house/surfhk/last")207 rootTime=getRootTimeModified(runNum,"surfHkFile")208 if(rawTime>rootTime):209 print "Need new surf hk ROOT file for run ",runNum," - rawTime = ",rawTime," / rootTime = ",rootTime," <- from check on house/surfhk/last"210 sys.stdout.flush()211 processCommand="runTelemSurfHkMaker.sh "+str(runNum)+" "+rawDir+" "+rootDir212 print "--> [1 of 4] - running ",processCommand213 sys.stdout.flush()214 subprocess.call([processCommand],shell=True)215 processCommand="runTelemAvgSurfHkMaker.sh "+str(runNum)+" "+rawDir+" "+rootDir216 print "--> [2 of 4] - running ",processCommand217 sys.stdout.flush()218 subprocess.call([processCommand],shell=True)219 processCommand="makeSurfHkJsonFiles"220 print "--> [3 of 4] - running ",processCommand221 sys.stdout.flush()222 subprocess.call([processCommand,getRootFilename(runNum,"surfHkFile")])223 processCommand="makeAvgSurfHkJsonFiles"224 print "--> [4 of 4] - running ",processCommand225 sys.stdout.flush()226 subprocess.call([processCommand,getRootFilename(runNum,"avgSurfHkFile")])227 else:228 print "surf hk ROOT file for run ",runNum," up-to-date - rawTime = ",rawTime," / rootTime = ",rootTime," <- from check on house/surfhk/last"229 sys.stdout.flush()230 rawTime=getRawTimeModified(runNum,"start/last")231 rootTime=getRootTimeModified(runNum,"auxFile")232 if(rawTime>rootTime):233 print "Need new aux ROOT file for run ",runNum," - rawTime = ",rawTime," / rootTime = ",rootTime," <- from check on start/last"234 sys.stdout.flush()235 processCommand="runTelemAuxMaker.sh "+str(runNum)+" "+rawDir+" "+rootDir236 print "--> [1 of 2] - running ",processCommand237 sys.stdout.flush()238 subprocess.call([processCommand],shell=True)239 processCommand="makeAcqdStartRunJsonFiles"240 print "--> [2 of 2] - running ",processCommand241 sys.stdout.flush()242 subprocess.call([processCommand,getRootFilename(runNum,"auxFile")])243 else:244 print "aux ROOT file for run ",runNum," up-to-date - rawTime = ",rawTime," / rootTime = ",rootTime," <- from check on start/last"245 sys.stdout.flush()246 rawTime=getRawTimeModified(runNum,"house/gps/last")247 rootTime=getRootTimeModified(runNum,"gpsFile")248 if(rawTime>rootTime):249 print "Need new GPS ROOT file"250 processCommand="runTelemGpsMaker.sh "+str(runNum)+" "+rawDir+" "+rootDir251 print "--> [1 of 12] - running ",processCommand252 sys.stdout.flush()253 subprocess.call([processCommand],shell=True)254 processCommand="makeAdu5PatJsonFiles"255 print "--> [2 of 12] - running ",processCommand256 sys.stdout.flush()257 subprocess.call([processCommand,getRootFilename(runNum,"gpsFile"),"0"])258 processCommand="makeAdu5PatJsonFiles"259 print "--> [3 of 12] - running ",processCommand260 sys.stdout.flush()261 subprocess.call([processCommand,getRootFilename(runNum,"gpsFile"),"1"])262 processCommand="makeAdu5SatJsonFiles"263 print "--> [4 of 12] - running ",processCommand264 sys.stdout.flush()265 subprocess.call([processCommand,getRootFilename(runNum,"gpsFile"),"0"])266 processCommand="makeAdu5SatJsonFiles"267 print "--> [5 of 12] - running ",processCommand268 sys.stdout.flush()269 subprocess.call([processCommand,getRootFilename(runNum,"gpsFile"),"1"])270 processCommand="makeAdu5VtgJsonFiles"271 print "--> [6 of 12] - running ",processCommand272 sys.stdout.flush()273 subprocess.call([processCommand,getRootFilename(runNum,"gpsFile"),"0"])274 processCommand="makeAdu5VtgJsonFiles"275 print "--> [7 of 12] - running ",processCommand276 sys.stdout.flush()277 subprocess.call([processCommand,getRootFilename(runNum,"gpsFile"),"1"]) 278 processCommand="makeG12PosJsonFiles"279 print "--> [8 of 12] - running ",processCommand280 sys.stdout.flush()281 subprocess.call([processCommand,getRootFilename(runNum,"gpsFile")])282 processCommand="makeG12SatJsonFiles"283 print "--> [9 of 12] - running ",processCommand284 sys.stdout.flush()285 subprocess.call([processCommand,getRootFilename(runNum,"gpsFile")])286 processCommand="makeGpsGgaJsonFiles"287 print "--> [10 of 12] - running ",processCommand288 sys.stdout.flush()289 subprocess.call([processCommand,getRootFilename(runNum,"gpsFile"),"0"])290 processCommand="makeGpsGgaJsonFiles"291 print "--> [11 of 12] - running ",processCommand292 sys.stdout.flush()293 subprocess.call([processCommand,getRootFilename(runNum,"gpsFile"),"1"])294 processCommand="makeGpsGgaJsonFiles"295 print "--> [12 of 12] - running ",processCommand296 sys.stdout.flush()297 subprocess.call([processCommand,getRootFilename(runNum,"gpsFile"),"2"])298 else:299 print "GPS ROOT file for run ",runNum," up-to-date - rawTime = ",rawTime," / rootTime = ",rootTime," <- from check on house/gps/last"300 sys.stdout.flush()301 rawTime=getRawTimeModified(runNum,"house/slowRate/last")302 rootTime=getRootTimeModified(runNum,"slowFile")303 if(rawTime>rootTime):304 print "Need new Slow ROOT file"305 processCommand="runTelemSlowMaker.sh "+str(runNum)+" "+rawDir+" "+rootDir306 print "--> [1 of 2] - running ",processCommand307 sys.stdout.flush()308 subprocess.call([processCommand],shell=True)309 processCommand="makeSlowRateHkJsonFiles"310 print "--> [2 of 3] - running ",processCommand311 sys.stdout.flush()312 subprocess.call([processCommand,getRootFilename(runNum,"slowFile")])313 processCommand="addRunToMapFile"314 print "--> [3 of 3] - running ",processCommand315 sys.stdout.flush()316 subprocess.call([processCommand,getRootFilename(runNum,"slowFile")])317 if(1):318 print "Reprocess config file"319 processCommand="processConfigTelem.sh"320 print "--> [1 of 1] - running ",processCommand321 subprocess.call([processCommand,str(runNum)])322 sys.stdout.flush()323if __name__ == "__main__":...

Full Screen

Full Screen

test_commands.py

Source:test_commands.py Github

copy

Full Screen

1import pytest2import GodMachine.commands as cmds3import GodMachine.bot as bot4@pytest.mark.asyncio5@pytest.mark.parametrize("command", ["/test", "/refuse", "/pass", "/impossible", "/care", "<@!> SITREP"])6async def test_simpleReplies(command):7 prompt_admin = bot.CommandPrompt(command, "Test UserName", True, None, None)8 result_admin = await bot.processCommand(prompt_admin)9 assert result_admin != None10 11 prompt_user = bot.CommandPrompt(command, "Test UserName", False, None, None)12 result_user = await bot.processCommand(prompt_user)13 assert result_user != None14@pytest.mark.asyncio15@pytest.mark.parametrize("command", ["/r 1", "/roll 0", "roll 10000", "roll 151", "/r 20 r", "/r 20 9a", "/r 20 8a", "/r 20 no10", "/r 20 adv", "/r 20 blessed", "/r 5 blighted", 16 "/r 20 3e", "/r 20 1e", "/r 20 r 9a 1e", "roll chance r", "roll coin", "roll %", "roll d12", "roll init 4", "roll chance", "roll", "roll tarot",17 "roll letter"])18async def test_roll(command):19 prompt = bot.CommandPrompt(command, "Test UserName", True, None, None)20 for i in range(100):21 result = await bot.processCommand(prompt)22 assert result != None23@pytest.mark.asyncio24@pytest.mark.parametrize("command", ["/init summary", "/init 4", "/init -20", "/initiative 4", "/init 4 char exampleCharacter", "/init 4 mod -2 char exampleCharacter", "/init mod exampleCharacter",25 "/init insert 16", "/init mod -2", "/init insert char bob", "/init help", "/init 4 noclean", "/init 4 nosummary", "/init summary", "/init clear", "/init",26 "/init 5 6", "/init nosummary", "/init mod -2 char nonExistentCharacter"])27async def test_init(command):28 prompt = bot.CommandPrompt(command, "Test UserName", True, None, None)29 for i in range(100):30 result = await bot.processCommand(prompt)31 assert result != None32@pytest.mark.asyncio33async def test_tarot():34 prompt = bot.CommandPrompt("/tarot", "Test UserName", True, None, None)35 for i in range(100):36 result = await bot.processCommand(prompt)37 assert result != None38@pytest.mark.asyncio39async def test_coinflip():40 prompt = bot.CommandPrompt("/coinflip", "Test UserName", True, None, None)41 for i in range(100):42 result = await bot.processCommand(prompt)43 assert result != None44@pytest.mark.asyncio45@pytest.mark.parametrize(("command", "admin"), [("/corrupt", True), ("/corruption 1", True), ("/corrupt 0.5", True), ("/corrupt .5", True), ("/corruption foo", True), ("/corrupt 2", True), ("/corrupt .5", False), ("/corruption 0", True)])46async def test_corruption(command, admin):47 prompt_corrupt = bot.CommandPrompt(command, "Test UserName", admin, None, None)48 result_corrupt = await bot.processCommand(prompt_corrupt)49 assert result_corrupt != None50 prompt_roll = bot.CommandPrompt("/roll 8", "Test UserName", admin, None, None)51 result_roll = await bot.processCommand(prompt_roll)52 assert result_roll != None53@pytest.mark.asyncio54@pytest.mark.parametrize("command", ["/extend 4", "/extended 10", "/extend", "/extend 4+1", "/extend 4 r", "/extend 4 no10", "/extend 4 8a", "/extend 4 patient", "/extend 4 fumble", "/extend 4 p5", "/extend 4 f-2"])55async def test_extend(command):56 prompt = bot.CommandPrompt(command, "Test UserName", True, None, None)57 result = await bot.processCommand(prompt)58 assert result != None59@pytest.mark.asyncio60@pytest.mark.parametrize(("command", "admin"), [("/cleanup", False), ("/cleanup", True), ("/cleanup all", True), ("/cleanup 1w", True), ("/cleanup 12d", True), ("/cleanup 24h", True), ("/cleanup 6m", True), ("/cleanup 1s", True),61 ("/cleanup 1w12d3h2m1s", True)])62async def test_cleanup(command, admin):63 prompt = bot.CommandPrompt(command, "Test UserName", admin, None, None)64 result = await bot.processCommand(prompt)65 assert result != None66@pytest.mark.asyncio67@pytest.mark.parametrize("command", ["/roll d12", "/roll 6d12", "/roll 0d12", "/roll 2d0", "/roll d12+8", "/roll d12-d4", "/roll d12+4d6-5", "/roll d12+4d6-5", "/roll d12+-4d6-5"])68async def test_rolldice(command):69 prompt = bot.CommandPrompt(command, "Test UserName", False, None, None)70 result = await bot.processCommand(prompt)71 assert result != None72@pytest.mark.asyncio73@pytest.mark.parametrize(("str", "echo"), [("normal", "normal"), ("with space", "with space"), (" leading space", "leading space"), ("trailing space ", "trailing space"), ("break\r\nline ", "break\r\nline"), ("", ""), (" ", "")])74async def test_echo(str, echo):75 prompt = bot.CommandPrompt("/echo " + str, "Test UserName", False, None, None)76 result = await bot.processCommand(prompt)77 assert result.message == echo78@pytest.mark.asyncio79@pytest.mark.parametrize(("str", "echo"), [("normal", "normal"), ("with space", "with space"), (" leading space", "leading space"), ("trailing space ", "trailing space"), ("break\r\nline ", "break\r\nline"), ("", ""), (" ", "")])80async def test_echoSoft(str, echo):81 prompt = bot.CommandPrompt("/echosoft " + str, "Test UserName", False, None, None)82 result = await bot.processCommand(prompt)83 assert result.message == echo84@pytest.mark.asyncio85@pytest.mark.parametrize("command", ["/choose a b", "/choice a", "/choose"])86async def test_choose(command):87 prompt = bot.CommandPrompt(command, "Test UserName", False, None, None)88 result = await bot.processCommand(prompt)89 assert result != None90@pytest.mark.asyncio91@pytest.mark.parametrize(("command", "response"), [("/customCommandTest", "root response"), ("/customCommandTest param1a", "1a response"), ("/customCommandTest param1a derp", "1a default"), ("/customCommandTest param1a param2b", "1a 2b response"), ("/customCommandTest param1b", "1b default: param1b"), ("normalText", None)])92async def test_customCommands(command, response):93 prompt = bot.CommandPrompt(command, "Test UserName", False, None, None)94 result = await bot.processCommand(prompt)95 assert (result == None and response == None) or (result.message == response)96@pytest.mark.asyncio97@pytest.mark.parametrize(("command", "responseValid"), [("/thiemo", True), ("/potg", True), ("/", False), ("", False)])98async def test_soundboardCommand(command, responseValid):99 prompt = bot.CommandPrompt("/thiemo", "Test UserName", True, None, None)100 result = await bot.processCommand(prompt)101 assert not responseValid or (result != None and result.succeeded() == True)102@pytest.mark.asyncio103@pytest.mark.parametrize(("command"), ["/thiemo", "/potg"])104async def test_soundboardCommand_sequential(command):105 prompt1 = bot.CommandPrompt(command, "Test UserName", True, None, None)106 result1 = await bot.processCommand(prompt1)107 assert result1 != None108 prompt2 = bot.CommandPrompt(command, "Test UserName", True, None, None)109 result2 = await bot.processCommand(prompt2)110 assert result2 != None111@pytest.mark.asyncio112async def test_stopSoundboardCommand():113 prompt1 = bot.CommandPrompt("/thiemo", "Test UserName", True, None, None)114 command1 = bot.processCommand(prompt1)115 prompt2 = bot.CommandPrompt("/stop", "Test UserName", True, None, None)116 result2 = await bot.processCommand(prompt2)117 result1 = await command1118 assert result1 != None and result1.succeeded() and result2 != None119@pytest.mark.asyncio120async def test_soundlist():121 prompt = bot.CommandPrompt("/soundlist", "Test UserName", True, None, None)122 for i in range(100):123 result = await bot.processCommand(prompt)124 assert result != None125########## KEEP LAST ##########126@pytest.mark.asyncio127@pytest.mark.parametrize(("command", "admin"), [("/shutdown", False), ("/shutdown", True)])128async def test_shutdown(command, admin):129 prompt = bot.CommandPrompt(command, "Test UserName", admin, None, None)130 if admin:131 with pytest.raises(SystemExit) as e:132 await bot.processCommand(prompt)133 assert e.type == SystemExit134 assert e.value.code == 0135 else:136 result = await bot.processCommand(prompt)...

Full Screen

Full Screen

process.py

Source:process.py Github

copy

Full Screen

1from multiprocessing import Process2from constants import ProcessCommand, ProcessEvent3from utils import match_pixel_to_monitor4from utils.math import calculate_line, calculate_distance5def start(sender, receiver):6 """Start the background process."""7 background_process = BackgroundProcess(sender, receiver)8 process = Process(target=background_process.run)9 process.daemon = True10 process.start()11 return process12class BackgroundProcess(object):13 def __init__(self, receiver, sender):14 self.receiver = receiver15 self.sender = sender16 self.command_mapping = {17 ProcessCommand.Pause: self.pause,18 ProcessCommand.Tick: self.tick,19 ProcessCommand.MonitorChanged: self.monitor_change,20 ProcessCommand.MouseMove: self.mouse_mouse,21 ProcessCommand.KeyPressed: self.key_press,22 ProcessCommand.KeyHeld: self.key_held,23 ProcessCommand.KeyReleased: self.key_release,24 ProcessCommand.GamepadButtonPressed: self.gamepad_button_press,25 ProcessCommand.GamepadButtonHeld: self.gamepad_button_held,26 ProcessCommand.GamepadButtonReleased: self.gamepad_button_release,27 ProcessCommand.GamepadThumbL: self.gamepad_thumb_l,28 ProcessCommand.GamepadThumbR: self.gamepad_thumb_r,29 ProcessCommand.GamepadTriggerL: self.gamepad_trigger_l,30 ProcessCommand.GamepadTriggerR: self.gamepad_trigger_r,31 }32 self.ticks = 033 def send_event(self, event, *data):34 self.sender.put((event, data))35 def run(self):36 """Wait for commands and process them."""37 try:38 while True:39 command, data = self.receiver.get()40 self.command_mapping[command](*data)41 # Signal to the main thread that an error occurred42 except Exception as e:43 self.send_event(ProcessEvent.Error)44 raise45 def tick(self, ticks):46 """Keep a count of all the ticks.47 For now, ignore the "total" tick value as it's not needed.48 """49 self.ticks += 150 def pause(self):51 """Reset certain settings on pause."""52 self.mouse_pos = None53 def monitor_change(self, data):54 """Record new monitor data."""55 self.monitor_data = data56 def mouse_mouse(self, pos):57 """Record mouse movement."""58 try:59 prev_mouse_pos = self.mouse_pos60 except AttributeError:61 prev_mouse_pos = None62 self.mouse_pos = pos63 # In certain cases such as the login screen, the position can't be read64 if self.mouse_pos is None:65 if prev_mouse_pos is not None:66 print('Unknown mouse position')67 return68 # Calculate the distance (or speed)69 distance = calculate_distance(self.mouse_pos, prev_mouse_pos)70 if distance:71 self.send_event(ProcessEvent.MouseDistance, distance)72 # Draw a line from the previous position73 pixels = [self.mouse_pos]74 if prev_mouse_pos is not None:75 pixels.extend(calculate_line(self.mouse_pos, prev_mouse_pos))76 pixels.append(prev_mouse_pos)77 # Determine which monitor each pixel is on78 start_monitor = match_pixel_to_monitor(pixels[0], self.monitor_data)79 if len(pixels) == 1:80 end_monitor = start_monitor81 else:82 end_monitor = match_pixel_to_monitor(pixels[-1], self.monitor_data)83 if start_monitor == end_monitor:84 monitor_mapping = {pixel: start_monitor for pixel in pixels}85 else:86 monitor_mapping = {87 pixels[0]: start_monitor,88 pixels[-1]: end_monitor,89 }90 for pixel in pixels[1:-1]:91 monitor_mapping[pixel] = match_pixel_to_monitor(pixel, self.monitor_data)92 for pixel, i in monitor_mapping.items():93 x1, y1, x2, y2 = self.monitor_data[i]94 width = x2 - x195 height = y2 - y196 #mtfile.record_mouse_movement(pixel=pixel, resolution=(width, height), ticks=self.ticks, speed=distance)97 def key_press(self, key, count): pass98 def key_held(self, key): pass99 def key_release(self, key): pass100 def gamepad_button_press(self, gamepad, button, count): pass101 def gamepad_button_held(self, gamepad, button): pass102 def gamepad_button_release(self, gamepad, button): pass103 def gamepad_thumb_l(self, gamepad, pos): pass104 def gamepad_thumb_r(self, gamepad, pos): pass105 def gamepad_trigger_l(self, gamepad, value): pass...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tox automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful