How to use getcmd method in Airtest

Best Python code snippet using Airtest

snmp_lib.py

Source:snmp_lib.py Github

copy

Full Screen

1import sys2##SNMP OID INFORMATION FETCH ## PYSNMP DOCS3def snmpget(oid, snmp_var):4 from pysnmp.entity.rfc3413.oneliner import cmdgen5 SNMP_HOST, SNMP_PORT, SNMP_COMMUNITY = snmp_var6 cmdGen = cmdgen.CommandGenerator()7 errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(8 cmdgen.CommunityData(SNMP_COMMUNITY),9 cmdgen.UdpTransportTarget((SNMP_HOST, SNMP_PORT)),10 oid11 )12 # Check for errors and print out results13 if errorIndication:14 print(errorIndication)15 else:16 if errorStatus:17 print('%s at %s' % (18 errorStatus.prettyPrint(),19 errorIndex and varBinds[int(errorIndex) - 1] or '?'20 )21 )22 else:23 for name, val in varBinds:24 # print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))25 return val26def snmp_inoctet(ifindex, snmp_access):27 oid = str('1.3.6.1.2.1.2.2.1.10.') + str(ifindex)28 from pysnmp.entity.rfc3413.oneliner import cmdgen29 SNMP_HOST, SNMP_PORT, SNMP_COMMUNITY = snmp_access30 cmdGen = cmdgen.CommandGenerator()31 errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(32 cmdgen.CommunityData(SNMP_COMMUNITY),33 cmdgen.UdpTransportTarget((SNMP_HOST, SNMP_PORT)),34 oid35 )36 # Check for errors and print out results37 if errorIndication:38 print(errorIndication)39 else:40 if errorStatus:41 print('%s at %s' % (42 errorStatus.prettyPrint(),43 errorIndex and varBinds[int(errorIndex) - 1] or '?'44 )45 )46 else:47 for name, val in varBinds:48 # print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))49 return val50def snmp_outoctet(ifindex, snmp_access):51 oid = str('1.3.6.1.2.1.2.2.1.16.') + str(ifindex)52 from pysnmp.entity.rfc3413.oneliner import cmdgen53 SNMP_HOST, SNMP_PORT, SNMP_COMMUNITY = snmp_access54 cmdGen = cmdgen.CommandGenerator()55 errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(56 cmdgen.CommunityData(SNMP_COMMUNITY),57 cmdgen.UdpTransportTarget((SNMP_HOST, SNMP_PORT)),58 oid59 )60 # Check for errors and print out results61 if errorIndication:62 print(errorIndication)63 else:64 if errorStatus:65 print('%s at %s' % (66 errorStatus.prettyPrint(),67 errorIndex and varBinds[int(errorIndex) - 1] or '?'68 )69 )70 else:71 for name, val in varBinds:72 # print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))73 return val74def snmp_intdesr(ifindex, snmp_access):75 #oid = str('1.3.6.1.2.1.2.2.1.2.') + str(ifindex)76 oid = str('1.3.6.1.2.1.31.1.1.1.18.') + str(ifindex)77 from pysnmp.entity.rfc3413.oneliner import cmdgen78 SNMP_HOST, SNMP_PORT, SNMP_COMMUNITY = snmp_access79 cmdGen = cmdgen.CommandGenerator()80 errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(81 cmdgen.CommunityData(SNMP_COMMUNITY),82 cmdgen.UdpTransportTarget((SNMP_HOST, SNMP_PORT)),83 oid84 )85 # Check for errors and print out results86 if errorIndication:87 print(errorIndication)88 else:89 if errorStatus:90 print('%s at %s' % (91 errorStatus.prettyPrint(),92 errorIndex and varBinds[int(errorIndex) - 1] or '?'93 )94 )95 else:96 for name, val in varBinds:97 # print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))98 return val99def snmp_intstatus(ifindex, snmp_access):100 oid = str('1.3.6.1.2.1.2.2.1.8.') + str(ifindex)101 from pysnmp.entity.rfc3413.oneliner import cmdgen102 SNMP_HOST, SNMP_PORT, SNMP_COMMUNITY = snmp_access103 cmdGen = cmdgen.CommandGenerator()104 errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(105 cmdgen.CommunityData(SNMP_COMMUNITY),106 cmdgen.UdpTransportTarget((SNMP_HOST, SNMP_PORT)),107 oid108 )109 # Check for errors and print out results110 if errorIndication:111 print(errorIndication)112 else:113 if errorStatus:114 print('%s at %s' % (115 errorStatus.prettyPrint(),116 errorIndex and varBinds[int(errorIndex) - 1] or '?'117 )118 )119 else:120 for name, val in varBinds:121 # print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))122 return val123def snmp_intmodified(ifindex, snmp_access):124 oid = str('1.3.6.1.2.1.2.2.1.9.') + str(ifindex)125 from pysnmp.entity.rfc3413.oneliner import cmdgen126 SNMP_HOST, SNMP_PORT, SNMP_COMMUNITY = snmp_access127 cmdGen = cmdgen.CommandGenerator()128 errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(129 cmdgen.CommunityData(SNMP_COMMUNITY),130 cmdgen.UdpTransportTarget((SNMP_HOST, SNMP_PORT)),131 oid132 )133 # Check for errors and print out results134 if errorIndication:135 print(errorIndication)136 else:137 if errorStatus:138 print('%s at %s' % (139 errorStatus.prettyPrint(),140 errorIndex and varBinds[int(errorIndex) - 1] or '?'141 )142 )143 else:144 for name, val in varBinds:145 # print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))146 return val147def snmp_cpupercent_sophos(snmp_access):148 #oid = str('1.3.6.1.2.1.2.2.1.9.') + str(ifindex)149 oid = str('1.3.6.1.4.1.21067.2.1.2.2.1')150 from pysnmp.entity.rfc3413.oneliner import cmdgen151 SNMP_HOST, SNMP_PORT, SNMP_COMMUNITY = snmp_access152 cmdGen = cmdgen.CommandGenerator()153 errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(154 cmdgen.CommunityData(SNMP_COMMUNITY),155 cmdgen.UdpTransportTarget((SNMP_HOST, SNMP_PORT)),156 oid157 )158 # Check for errors and print out results159 if errorIndication:160 print(errorIndication)161 else:162 if errorStatus:163 print('%s at %s' % (164 errorStatus.prettyPrint(),165 errorIndex and varBinds[int(errorIndex) - 1] or '?'166 )167 )168 else:169 for name, val in varBinds:170 # print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))171 return val172def snmp_diskpercent_sophos(snmp_access):173 oid = str('1.3.6.1.4.1.21067.2.1.2.3.2')174 from pysnmp.entity.rfc3413.oneliner import cmdgen175 SNMP_HOST, SNMP_PORT, SNMP_COMMUNITY = snmp_access176 cmdGen = cmdgen.CommandGenerator()177 errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(178 cmdgen.CommunityData(SNMP_COMMUNITY),179 cmdgen.UdpTransportTarget((SNMP_HOST, SNMP_PORT)),180 oid181 )182 # Check for errors and print out results183 if errorIndication:184 print(errorIndication)185 else:186 if errorStatus:187 print('%s at %s' % (188 errorStatus.prettyPrint(),189 errorIndex and varBinds[int(errorIndex) - 1] or '?'190 )191 )192 else:193 for name, val in varBinds:194 # print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))195 return val196def snmp_memorypercent_sophos(snmp_access):197 oid = str('1.3.6.1.4.1.21067.2.1.2.4.2')198 from pysnmp.entity.rfc3413.oneliner import cmdgen199 SNMP_HOST, SNMP_PORT, SNMP_COMMUNITY = snmp_access200 cmdGen = cmdgen.CommandGenerator()201 errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(202 cmdgen.CommunityData(SNMP_COMMUNITY),203 cmdgen.UdpTransportTarget((SNMP_HOST, SNMP_PORT)),204 oid205 )206 # Check for errors and print out results207 if errorIndication:208 print(errorIndication)209 else:210 if errorStatus:211 print('%s at %s' % (212 errorStatus.prettyPrint(),213 errorIndex and varBinds[int(errorIndex) - 1] or '?'214 )215 )216 else:217 for name, val in varBinds:218 # print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))219 return val220def snmp_liveusers_sophos(snmp_access):221 oid = str('1.3.6.1.4.1.21067.2.1.2.6')222 from pysnmp.entity.rfc3413.oneliner import cmdgen223 SNMP_HOST, SNMP_PORT, SNMP_COMMUNITY = snmp_access224 cmdGen = cmdgen.CommandGenerator()225 errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(226 cmdgen.CommunityData(SNMP_COMMUNITY),227 cmdgen.UdpTransportTarget((SNMP_HOST, SNMP_PORT)),228 oid229 )230 # Check for errors and print out results231 if errorIndication:232 print(errorIndication)233 else:234 if errorStatus:235 print('%s at %s' % (236 errorStatus.prettyPrint(),237 errorIndex and varBinds[int(errorIndex) - 1] or '?'238 )239 )240 else:241 for name, val in varBinds:242 # print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))243 return val244def snmp_ap_adopted(snmp_access):245 oid = str('1.3.6.1.4.1.9.9.618.1.8.4.0')246 from pysnmp.entity.rfc3413.oneliner import cmdgen247 SNMP_HOST, SNMP_PORT, SNMP_COMMUNITY = snmp_access248 cmdGen = cmdgen.CommandGenerator()249 errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(250 cmdgen.CommunityData(SNMP_COMMUNITY),251 cmdgen.UdpTransportTarget((SNMP_HOST, SNMP_PORT)),252 oid253 )254 # Check for errors and print out results255 if errorIndication:256 print(errorIndication)257 else:258 if errorStatus:259 print('%s at %s' % (260 errorStatus.prettyPrint(),261 errorIndex and varBinds[int(errorIndex) - 1] or '?'262 )263 )264 else:265 for name, val in varBinds:266 # print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))267 return val268def snmp_total_clients(snmp_access):269 oid = str('1.3.6.1.4.1.9.9.618.1.8.12.0')270 from pysnmp.entity.rfc3413.oneliner import cmdgen271 SNMP_HOST, SNMP_PORT, SNMP_COMMUNITY = snmp_access272 cmdGen = cmdgen.CommandGenerator()273 errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(274 cmdgen.CommunityData(SNMP_COMMUNITY),275 cmdgen.UdpTransportTarget((SNMP_HOST, SNMP_PORT)),276 oid277 )278 # Check for errors and print out results279 if errorIndication:280 print(errorIndication)281 else:282 if errorStatus:283 print('%s at %s' % (284 errorStatus.prettyPrint(),285 errorIndex and varBinds[int(errorIndex) - 1] or '?'286 )287 )288 else:289 for name, val in varBinds:290 # print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))291 return val292def snmp_wlan_clients(wlanIndex,snmp_access):293 oid = str('1.3.6.1.4.1.14179.2.1.1.1.38.') + str(wlanIndex)294 from pysnmp.entity.rfc3413.oneliner import cmdgen295 SNMP_HOST, SNMP_PORT, SNMP_COMMUNITY = snmp_access296 cmdGen = cmdgen.CommandGenerator()297 errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(298 cmdgen.CommunityData(SNMP_COMMUNITY),299 cmdgen.UdpTransportTarget((SNMP_HOST, SNMP_PORT)),300 oid301 )302 # Check for errors and print out results303 if errorIndication:304 print(errorIndication)305 else:306 if errorStatus:307 print('%s at %s' % (308 errorStatus.prettyPrint(),309 errorIndex and varBinds[int(errorIndex) - 1] or '?'310 )311 )312 else:313 for name, val in varBinds:314 # print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))315 return val316def snmp_wlc_temprature(snmp_access):317 oid = str('1.3.6.1.4.1.14179.2.3.1.13.0')318 from pysnmp.entity.rfc3413.oneliner import cmdgen319 SNMP_HOST, SNMP_PORT, SNMP_COMMUNITY = snmp_access320 cmdGen = cmdgen.CommandGenerator()321 errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(322 cmdgen.CommunityData(SNMP_COMMUNITY),323 cmdgen.UdpTransportTarget((SNMP_HOST, SNMP_PORT)),324 oid325 )326 # Check for errors and print out results327 if errorIndication:328 print(errorIndication)329 else:330 if errorStatus:331 print('%s at %s' % (332 errorStatus.prettyPrint(),333 errorIndex and varBinds[int(errorIndex) - 1] or '?'334 )335 )336 else:337 for name, val in varBinds:338 # print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))339 return val340def snmp_wlc_uptime(snmp_access):341 oid = str('1.3.6.1.2.1.1.3.0')342 from pysnmp.entity.rfc3413.oneliner import cmdgen343 SNMP_HOST, SNMP_PORT, SNMP_COMMUNITY = snmp_access344 cmdGen = cmdgen.CommandGenerator()345 errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(346 cmdgen.CommunityData(SNMP_COMMUNITY),347 cmdgen.UdpTransportTarget((SNMP_HOST, SNMP_PORT)),348 oid349 )350 # Check for errors and print out results351 if errorIndication:352 print(errorIndication)353 else:354 if errorStatus:355 print('%s at %s' % (356 errorStatus.prettyPrint(),357 errorIndex and varBinds[int(errorIndex) - 1] or '?'358 )359 )360 else:361 for name, val in varBinds:362 # print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))...

Full Screen

Full Screen

sfarm-vv-pinpong1.2.py

Source:sfarm-vv-pinpong1.2.py Github

copy

Full Screen

1# coding: utf-82# 开源智慧农场终端代码,基于虚谷号(pinpong库)。3# 版本:1.24# 功能:1)支持最基本功能,发送光线、土壤湿度,订阅浇水指令;2)支持交互指令中的IP查询。5from pinpong.board import *6import siot,time7import socket8iot_server = '192.168.3.41' # mqtt服务器地址9iot_user = 'scope'10iot_pwd = 'scope'11projectid = 'sf88' # 项目编号,不同项目的id不能相同,否则进程会被踢出12t1 = 60 # 上传数据的间隔时间,单位秒13t2 = 5 # 电磁阀打开的保持时间,单位秒14soil = 500 # 土壤湿度传感器数值阈值,建议大于50015is_close_unknown = False #非正常关闭siot连接标志16a0 = None17a1 = None18start_time = None19end_time = None20i = 021topic_msg_map = {}22# 获取IP地址23def get_host_ip():24 try:25 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)26 s.connect(('8.8.8.8', 80))27 ip = s.getsockname()[0]28 finally:29 s.close()30 return ip31def on_topic_subscribe(client,userdata,msg):32 global topic_msg_map33 topic_msg_map[str(msg.topic)]=str(msg.payload.decode())34def on_topic_read(topic):35 global topic_msg_map36 result=topic_msg_map.get(topic,None)37 if result:38 del topic_msg_map[topic]39 return str(result)40def all_subscribe(): #所有的订阅相关集合 41 siot.subscribe(projectid + '/relay',on_topic_subscribe)42 siot.subscribe(projectid + '/info',on_topic_subscribe)43def on_connect(client, userdata, flags, rc):#siot连接时回调44 rc=int(rc)45 if(rc==0):#连接成功46 global is_close_unknown #如果是未知原因关闭的话 订阅会丢失 重新订阅即可47 if(is_close_unknown):48 all_subscribe()49 is_close_unknown=False50 print("非正常关闭,所以需要重新初始化订阅相关代码")51def on_disconnect(client, userdata, rc): #siot关闭连接时回调52 rc=int(rc)53 if(rc!=0): #0是正常关闭 非0即表示非正常关闭54 print("说明连接已经关闭了....")55 global is_close_unknown56 is_close_unknown=True57siot.init(projectid,iot_server,user=iot_user,password=iot_pwd)58siot.client.on_connect=on_connect59siot.client.on_disconnect=on_disconnect60siot.connect()61all_subscribe()62siot.loop()63start_time = time.time() - t164water = False65Board("uno").begin()66while True:67 a0 = Pin(Pin.A0,Pin.ANALOG).read_analog()68 a1 = Pin(Pin.A1,Pin.ANALOG).read_analog()69 if soil < a1:70 water = True71 print("超出阈值,当前为:%d"%a1)72 getcmd = on_topic_read(projectid + '/info')73 if getcmd:74 print("收到互动指令:内容为:%s"%getcmd)75 if getcmd.lower() == 'ip':76 siot.publish(projectid + '/info','ip=' + get_host_ip())77 getcmd = on_topic_read(projectid + '/relay')78 if getcmd:79 print("收到浇水指令:内容为:%s"%getcmd)80 if getcmd == '1':81 water = True82 if water:83 Pin(Pin.D2,Pin.OUT).write_digital(1)84 time.sleep(t2);85 Pin(Pin.D2,Pin.OUT).write_digital(0)86 water = not water87 88 end_time = time.time()89 if end_time - start_time > t1:90 siot.publish(projectid + '/light',a0)91 time.sleep(0.01);92 siot.publish(projectid + '/soil',a1)93 start_time = time.time()94 i=i+195 print("发送数据:%d"%i)...

Full Screen

Full Screen

vget.py

Source:vget.py Github

copy

Full Screen

12import ops, ops.data, ops.cmd3import sys4import dsz5import os6from ops.parseargs import ArgumentParser78def main():9 usage = 'Usage: python windows\\vget.py -args "-F [Full Path to File] -p [path to file] -m [mask] Optional: -t [bytes] -hex -nosend"\n\nOptions:\n-t [bytes] : grab last x bytes of file (tail)\n-nosend : move file to nosend dir\n-hex : open file in hex editor\n\nEx. python windows\\vget.py -args "-m connections.log -p C:\\Documents and Settings\\user\\logs -t 10000 -nosend"\nEx. python windows\\vget.py -args "-F C:\\Documents and Settings\\user\\logs\\connections.log -t 10000 -nosend -hex"'10 parser = ArgumentParser(usage=usage)11 parser.add_argument('-p', dest='path', nargs='+', action='store', default=False)12 parser.add_argument('-m', dest='mask', action='store', default=False)13 parser.add_argument('-F', dest='full_path', nargs='+', action='store', default=False)14 parser.add_argument('-t', dest='tail', type=int, action='store', default=False)15 parser.add_argument('--nosend', dest='nosend', action='store_true', default=False)16 parser.add_argument('--hex', dest='hex', action='store_true', default=False)17 options = parser.parse_args()18 if (len(sys.argv) == 1):19 print usage20 sys.exit(0)21 if (options.full_path == options.mask == False):22 ops.warn('No mask or full path specified! Need one or the other to execute.')23 sys.exit(0)24 mask = options.mask25 tail = options.tail26 nosend = options.nosend27 hex = options.hex28 getCmd = ops.cmd.getDszCommand('get')29 if options.full_path:30 full_path = ' '.join(options.full_path)31 getCmd.arglist.append(('"%s"' % full_path))32 else:33 if options.path:34 path = ' '.join(options.path)35 getCmd.optdict['path'] = ('"%s"' % path)36 getCmd.optdict['mask'] = mask37 if tail:38 getCmd.arglist.append(('-tail %s' % tail))39 getCmd.dszquiet = False40 getCmd.execute()41 getResult = getCmd.result42 id = getResult.cmdid43 for n in getResult.filestop:44 if (n.successful != 1):45 ops.error(('Get Failed; see cmdid %s or above output for more info' % id))46 sys.exit(0)47 localName = ''48 for n in getResult.filelocalname:49 localName = n.localname50 fullLocalPath = os.path.join(dsz.env.Get('_LOGPATH'), 'GetFiles', localName)51 if (nosend == True):52 movePath = os.path.join(dsz.env.Get('_LOGPATH'), 'GetFiles\\NoSend', localName)53 moveCmd = ops.cmd.getDszCommand(('local run -command "cmd.exe /c move %s %s"' % (fullLocalPath, movePath)))54 moveCmd.execute()55 fullLocalPath = movePath56 ops.info(('File moved to %s' % movePath))57 if (hex == False):58 ops.info('Opening file with notepad++')59 showCmd = ops.cmd.getDszCommand(('local run -command "cmd.exe /c C:\\progra~1\\notepad++\\notepad++.exe %s"' % fullLocalPath))60 else:61 ops.info('Opening file with hex editor')62 showCmd = ops.cmd.getDszCommand(('local run -command "cmd.exe /c C:\\Progra~1\\BreakP~1\\HexWor~1.2\\hworks32.exe %s"' % fullLocalPath))63 showCmd.execute()64if (__name__ == '__main__'): ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Airtest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful