How to use start_listener method in localstack

Best Python code snippet using localstack_python

smooth_leveling.py

Source:smooth_leveling.py Github

copy

Full Screen

1import jpype2import time3import json4from operator import methodcaller5from jpype import *6from auto.getres import Res7# Login = None8from tools.base_logging import Log9log = Log()10class Listener:11 def onSocketConnectionSuccess(self):12 # global Login13 # Login = True14 print("BZL 连接成功")15 def onSocketDisconnection(self, e):16 print('BZL 连接断开')17 def onSocketConnectionFailed(self, e):18 # global Login19 # Login = False20 print('BZL 连接失败')21 # print(Login)22 def readData(self, header, body):23 pass24 # print("读取数据")25 # print(type(header), type(body))26 # print(str(header))27 # print(str(body))28 def upDateLaserData(self, data):29 print("激光数据更新")30 print(list(data))31 def upDateMapData(self, data):32 print("地图数据更新")33 print(list(data))34 def upDateJsonFileData(self, data):35 print("json路径更新")36 print(str(data))37 def upDateIOTMap(self, data):38 print("IOT生成地图通知")39 print(str(data))40 def refusedConnect(self, data):41 print("已有APP连接")42 print(str(data))43 def upDateHeart(self, data):44 pass45 # print("心跳通知:" + str(data))46 def revControllerData(self, data):47 print("controller_server数据")48 print(str(data))49 def notifyPGMOperation(self, data):50 print("notifyPGMOperation")51 print(str(data))52 def notifyYAMLOperation(self, data):53 print("notifyYAMLOperation")54 print(str(data))55 def notifyJSONOperation(self, data):56 print("notifyJSONOperation")57 print(str(data))58 def responseResult(self, mResponseType, mResult):59 print("BZL responseResult")60 print(mResponseType)61 print(mResult)62 def responseLogin(self, mLoginInfo):63 # setattr(Res, 'Login', True)64 print("responseLogin")65 print(str(mLoginInfo))66 def responseMapConfig(self, mOccupancyGrid):67 print("protobuf数据 地图配置参数")68 print(str(mOccupancyGrid))69 def responseLaserConfig(self, mLaserScan):70 print("protobuf数据 激光配置参数")71 print(str(mLaserScan))72 def responseMapList(self, mFileInfos):73 print("protobuf数据 地图列表")74 print(str(mFileInfos))75 def responseRosStatus(self, mRosPose, mLocalizationState):76 print("protobuf数据 机器位置和激光定位状态")77 print(str(mRosPose))78 print(str(mLocalizationState))79 def ros_communication_response_Data(self, s):80 # if json.loads(str(s)).get('cmd_type') == 1000:81 # log.info(json.loads(str(s)))82 # print('s:{}'.format(str(s)))83 # print('response_Data:{}'.format(getattr(Res, 'response_Data')))84 # if json.loads(str(s)) == getattr(Res, 'response_Data'):85 # setattr(Res, 'response', True)86 # if getattr(Res, 'start_listener') and json.loads(str(s)).get('cmd_type') == 1001:87 # print("开始监控:{}".format(time.time()))88 # # print('s:{}'.format(str(s)))89 # # print('response_Data:{}'.format(getattr(Res, 'response_Data')))90 #91 # data = json.loads(str(s)).get('data')92 # # print(data)93 # setattr(Res, 'data1', data[-1]['data'])94 # setattr(Res, 'data2', data[0]['data'])95 # setattr(Res, 'start_listener', False)96 # print(data[0]['data'][73])97 # print('ros_communication_response_Data')98 # print(str(s))99 # pass100 # 地面抹平101 if getattr(Res, 'start_listener') and json.load(str(s))["cmd_type"] == 1000:102 data = json.loads(str(s)).get('data')103 setattr(Res, 'data', data[-1]['result'])104 setattr(Res, 'start_listener', False)105 def revLandmarkData(self, data):106 print("反光板数据")107 print(str(data))108 def revLocationStatus(self, data):109 print("定位状态数据")110 print(str(data))111 def revSwitchScene(self, data):112 print("场景切换应答")113 print(str(data))114 def userManagerData(self, jsons):115 print("用户管理透传的json数据")116 print(str(jsons))117def readJson(file_name):118 with open(file_name, 'r') as f:119 data = json.load(f)120 mapdata = data['TaskData']121 return mapdata122def send_heart(seconds, agv):123 count = seconds * 20124 count = int(count)125 for i in range(count):126 agv.sendRosCommunicationRequestData('{"cmd_type":2020,"data":[],"seq_cmd":1}')127 time.sleep(0.05)128def pressure_request(duration, number, objects, cls_boj):129 """130 :param duration: 持续时间131 :param number: 循环次数132 :param objects: 方法名称 对象(methodcaller('loginRobot', 'bzl_user2', 'ss1234567'),) 第一个参数方法名称字符串,后续参数为方法的参数133 :param cls_boj: 类对象134 :return:135 """136 # t1 = time.time()137 time1 = 1 / number138 for t in range(duration):139 for i in range(number):140 for obj in objects:141 # start_time = time.time()142 obj(cls_boj)143 time.sleep(time1)144 # end_time = time.time()145 # print(end_time - start_time)146 # t2 = time.time()147 time.sleep(1)148 # print("end: time=" + str(t2 - t1))149def test():150 jpype.startJVM(jpype.getDefaultJVMPath(), "-ea", "-Djava.class.path=BaseAGV-2.3.1.jar") # 启动jvm151 # jpype.startJVM(jpype.getDefaultJVMPath(), "-ea", "-Djava.class.path=agvsdk.jar")152 AGVManager = jpype.JClass("com.bzl.baseagv.AGVManager")().getInstance() # AGVManager单例153 # AGVManager.setAGVStatusListener()154 Direction = jpype.JPackage('com.bzl.baseagv.impl').Direction # java 枚举类155 mFileAction = jpype.JPackage('com.bzl.baseagv.impl').FileType # java 枚举类156 MapType = jpype.JPackage('com.bzl.baseagv.proto.Robot').MapType # java 枚举类157 TestAction = jpype.JPackage('com.bzl.baseagv.controllerjson.testdata').TestAction # java 枚举类158 # StartTestParam = jpype.JObject('com.bzl.baseagv.controllerjson.testdata.StartTestParam')159 # StartTestParam = jpype.JClass('com.bzl.baseagv.controllerjson.testdata.StartTestParam')()160 StartTestParam = jpype.JPackage('com.bzl.baseagv.controllerjson.testdata.StartTestParam').StartTestParam161 GetErrorCodeHistoryParam = jpype.JPackage(162 'com.bzl.baseagv.controllerjson.errordata.GetErrorCodeHistoryParam').GetErrorCodeHistoryParam163 ExportErrorCodeHistoryParam = jpype.JPackage(164 'com.bzl.baseagv.controllerjson.errordata.ExportErrorCodeHistoryParam').ExportErrorCodeHistoryParam165 UserBean = jpype.JPackage('com.bzl.baseagv.controllerjson.userdata.UserBean').UserBean166 listener = jpype.JProxy("com.bzl.baseagv.impl.RobotListener", inst=Listener()) # 接数据上报监听重载类167 AGVManager.setRobotListener(listener) # 数据上报监听168 # AGVManager.setAGVIPPort('192.168.8.1', 5979) # 设置机器IP和端口169 # AGVManager.setAGVIPPort('192.168.1.10', 5979) # 设置机器IP和端口170 AGVManager.setAGVIPPort('192.168.1.110', 5979) # 端口错误171 # AGVManager.setAGVIPPort('192.168.1.111', 5979) # ip错误172 AGVManager.connectRobot() # 连接机器人173 time.sleep(0.5)174 AGVManager.loginRobot('bzl_user2', 'ss1234567') # 登录机器人175 # AGVManager.loginRobot('zxc123', 'asdfghjkllkjhgfdsa')176 # AGVManager.loginRobot('bzl_user', 'ss1234567') # 账号错误177 # AGVManager.loginRobot('bzl_user2', 'ss12345678') #密码错误178 # AGVManager.loginRobot('', 'ss1234567') # 账号为空179 # AGVManager.loginRobot('bzl_user2', '') #密码为空180 # AGVManager.loginRobot('auto', '')181 # while not Login:38、182 # print('1')183 # if Login is True:184 # print("登录成功")185 # else:186 # print("登录失败")187 # AGVManager.sendRosCommunicationRequestData(188 # '{"cmd_type":1001,"data":[{"addr":19539,"cmd":0,"data":[36864],"num":2}],"seq_cmd":1}')189 # time.sleep(2)190 print(getattr(Res, 'Login'))191 # while not Login:192 # print('1')193 # if Login is True:194 # print("登录成功")195 # else:196 # print("登录失败")197 while True:198 input_text = input()199 if input_text == "3": # 心跳使能200 AGVManager.setHeartEnable(True)201 elif input_text == "4": # 发送心跳202 AGVManager.sendHeart()203 elif input_text == "5": # 请求机器人信息204 AGVManager.reqControllerServerInfo()205 elif input_text == "6": # 请求机器状态(位置和定位状态)206 AGVManager.reqAllFrequentQuery()207 elif input_text == "7": # 请求激光数据208 AGVManager.reqLaserConfigAndData()209 elif input_text == "8": # 请求地图列表210 AGVManager.reqMapList()211 elif input_text == "9": # 请求机器当前地图212 AGVManager.reqMapConfigAndData()213 elif input_text == "10": # 获取指定地图214 map_name = input("输入地图名称")215 AGVManager.reqPreViewMap(map_name) # (String name)216 elif input_text == "11": # 加载地图217 map_name = input('输入地图名称')218 AGVManager.loadMapByRoomBean(map_name, MapType.MAP_TYPE_2D) # (String name)219 elif input_text == "12": # 开始建图220 AGVManager.startMapping(MapType.MAP_TYPE_2D)221 #222 elif input_text == "13": # 结束建图223 map_name = input('输入地图名称')224 AGVManager.finishMapping(map_name) # (String name)225 #226 elif input_text == '14': # 取消建图227 AGVManager.cancelMapping()228 elif input_text == "15": # 删除地图229 map_name = input('输入地图名称')230 AGVManager.deleteMap(map_name) # (String name)231 elif input_text == "16": # 设置初始点232 AGVManager.setInitPos(jpype.JFloat(-0.7), jpype.JFloat(-0.7), jpype.JFloat(-0.8))233 elif input_text == "M": # 底盘移动向上234 t1 = time.time()235 AGVManager.move(Direction.UP, 0.1)236 time.sleep(0.25)237 AGVManager.move(Direction.DOWN, 0.1)238 time.sleep(0.25)239 AGVManager.move(Direction.LEFT, 0.1)240 time.sleep(0.25)241 AGVManager.move(Direction.RIGHT, 0.1)242 time.sleep(0.25)243 t2 = time.time()244 print(t2 - t1)245 elif input_text == "w": # 底盘移动向上246 speed = float(input("请输入速度(0.1表示100mm/s)"))247 for n in range(1):248 AGVManager.move(Direction.UP, speed)249 time.sleep(0.5)250 elif input_text == "s": # 底盘移动向下251 speed = float(input("请输入速度(0.1表示100mm/s)"))252 for n in range(1):253 AGVManager.move(Direction.DOWN, speed)254 time.sleep(0.5)255 elif input_text == "a": # 底盘移动向左256 speed = float(input("请输入速度(0.1表示100mm/s)"))257 AGVManager.move(Direction.LEFT, speed)258 elif input_text == "d": # 底盘移动向右259 speed = float(input("请输入速度(0.1表示100mm/s)"))260 AGVManager.move(Direction.RIGHT, speed)261 elif input_text == "aa": # 底盘移动左转移动262 speed = float(input("请输入速度(0.1表示100mm/s)"))263 AGVManager.move(Direction.YAW_LEFT, speed)264 time.sleep(1)265 # AGVManager.move(Direction.YAW_RIGHT, speed)266 # time.sleep(1)267 # AGVManager.move(Direction.UP, speed)268 # time.sleep(1)269 # AGVManager.move(Direction.DOWN, speed)270 elif input_text == "dd": # 底盘移动右转移动271 speed = float(input("请输入速度(0.1表示100mm/s)"))272 AGVManager.move(Direction.YAW_RIGHT, speed)273 # AGVManager.move(Direction.UP, 300)274 elif input_text == 'wa': # 左前方曲线运动275 # speed = float(input("请输入速度(0.1表示100mm/s)"))276 l_x = 0.1277 l_y = 0.1278 l_z = 0.1279 a_x = 0.1280 a_y = 0.1281 a_z = 0.1282 AGVManager.move(l_x, l_y, l_z, a_x, a_y, a_z)283 elif input_text == "17": # 底盘移动右转移动 超出范围284 AGVManager.move(Direction.YAW_RIGHT, 3)285 AGVManager.move(Direction.UP, 600)286 elif input_text == "18": # 底盘移动右转移动 超出范围287 AGVManager.move(Direction.YAW_RIGHT, -1)288 AGVManager.move(Direction.UP, -100)289 elif input_text == "19": # 清除故障290 ty = input("bit0:复位清空运控故障(需等待1s左右) bit1:清空电机故障(很快) bit2:复位电机驱动,需等待10s左右")291 ty = int(ty)292 AGVManager.clearFault(ty)293 elif input_text == "20": # 清除任务294 AGVManager.clearRoutePose()295 elif input_text == "21": # 获取反光板数据296 AGVManager.getLandMarkData()297 elif input_text == "22": # 硬急停复位298 AGVManager.resetHardEmergencyStop()299 elif input_text == "23": # 获取委外测试状态信息300 AGVManager.getFunctionTestState()301 elif input_text == "24": # 单点导航302 AGVManager.setNavigationPoint(1.2, 1.4, 0.05) # (double x, double y, float angle)303 elif input_text == "25": # 启动定距离移动304 AGVManager.fixedDistanceMoveStart(20, 0, 0.05) # (double x, double y,int speed)305 elif input_text == "26": # 停止定距离移动306 AGVManager.fixedDistanceMoveStop()307 #308 # elif input_text == "27": # 单点多点导航数据组装309 # """参数:310 # taskID - 任务ID311 # mapName - 地图名字312 # navigationLoopNumber - 循环次数313 # OrgPt - 起始点314 # mWorkPointBeans - 路径点"""315 # lst = jpype.java.util.ArrayList()316 # lst.add(WorkPointBean(jpype.JDouble(1.2), jpype.JDouble(1.2)))317 # AGVManager.moveRoutePose(1, "map1", 1, WorkPointBean(jpype.JDouble(1.2), jpype.JDouble(1.2)),318 # lst) # (int taskID, String mapName, int navigationLoopNumber, WorkPointBean OrgPt, List<WorkPointBean> mWorkPointBeans)319 elif input_text == "28": # 暂停320 AGVManager.pauseRoutePose()321 elif input_text == "29": # 继续322 AGVManager.continueRoutePose()323 elif input_text == "30": # 停止324 AGVManager.cancelRoutePose()325 elif input_text == "31": # cyber1.1开始任务326 AGVManager.runRoutePose()327 # elif input_text == "31": # ros和cyber1.0开始 从哪个点开始,默认0328 # AGVManager.startRoutePose(1)329 elif input_text == "32": # 开始 从哪个点开始,默认0 workIndex - 开始工艺作业点330 AGVManager.startRoutePose(1, 2) # (int position, int workIndex)331 elif input_text == "33": # 自动模式332 AGVManager.turnAutoMode()333 elif input_text == "34": # 手动模式334 AGVManager.turnManualMode()335 elif input_text == "35": # 急停336 AGVManager.quickStop()337 elif input_text == "36": # 急停复位338 AGVManager.cancelQuickStop()339 elif input_text == "37": # 获取异常断电作业恢复信息340 AGVManager.getResumeWorkInfo()341 # elif input_text == "38": # 操作PGM、YAML、JSON文件-增加|修改JSON文件342 # # JSON_DELETE:删除json文件; JSON_EDIT:增加|修改Json文件;PGM_EDIT:增加|修改PGM文件;YAML_EDIT:增加|修改YAML文件343 # with open('map1.json', 'rb') as f:344 # data = f.read()345 # AGVManager.editFileInfo(mFileAction.JSON_EDIT, 'map1.json',346 # data) # (FileType mFileAction, String name, byte[] datas)347 # elif input_text == "39": # 操作PGM、YAML、JSON文件-增加|修改PGM文件348 # # JSON_DELETE:删除json文件; JSON_EDIT:增加|修改Json文件;PGM_EDIT:增加|修改PGM文件;YAML_EDIT:增加|修改YAML文件349 # with open('map1.pgm', 'rb') as f:350 # data = f.read()351 # AGVManager.editFileInfo(mFileAction.PGM_EDIT, 'map1.pgm',352 # data) # (FileType mFileAction, String name, byte[] datas)353 # elif input_text == "40": # 操作PGM、YAML、JSON文件-增加|修改YAML文件354 # # JSON_DELETE:删除json文件; JSON_EDIT:增加|修改Json文件;PGM_EDIT:增加|修改PGM文件;YAML_EDIT:增加|修改YAML文件355 # with open('map1.yaml', 'rb') as f:356 # data = f.read()357 # AGVManager.editFileInfo(mFileAction.YAML_EDIT, 'map1.yaml',358 # data) # (FileType mFileAction, String name, byte[] datas)359 # elif input_text == "41": # 操作PGM、YAML、JSON文件-增加|修改JSON文件360 # # JSON_DELETE:删除json文件; JSON_EDIT:增加|修改Json文件;PGM_EDIT:增加|修改PGM文件;YAML_EDIT:增加|修改YAML文件361 # with open('map_0711.json', 'rb') as f:362 # data = f.read()363 # AGVManager.editFileInfo(mFileAction.JSON_DELETE, 'map_0711.json',364 # data) # (FileType mFileAction, String name, byte[] datas)365 elif input_text == "42": # cyber1.1设置开始站点366 AGVManager.setRouteStartPose(1)367 elif input_text == "43": # cyber1.1设置开始站点368 AGVManager.setRouteStartPose(1, 1)369 elif input_text == "44": # 3D建图使用,启动定位370 AGVManager.startLocalizationNode()371 elif input_text == "45": # 切换机器运行场景372 i = input("输入场景:1室内,0室外")373 a = int(i)374 AGVManager.switchScence(a)375 elif input_text == '46': # 获取里程计检查信息376 AGVManager.getOdomInspectInfo()377 elif input_text == '47': # 开始启动测试378 """379 testType - 测试类型,0默认, 1间隔,2连续380 runDis - 运动距离,单位m381 """382 AGVManager.startFunctionTest(0, 1)383 elif input_text == '48': # 开始里程计检查384 AGVManager.startInspectOdom()385 elif input_text == '49': # 停止测试386 AGVManager.stopFunctionTest()387 elif input_text == "50": # AGV移动状态-自动模式下开保护盖388 AGVManager.sendRosCommunicationRequestData(389 '{"cmd_type":1001,"data":[{"addr":4097,"cmd":5,"data":[0],"num":2}],"seq_cmd":1}')390 elif input_text == "51": # AGV移动状态-自动模式下开保护盖391 AGVManager.sendRosCommunicationRequestData(392 '{"cmd_type":1001,"data":[{"addr":4097,"cmd":5,"data":[4],"num":2}],"seq_cmd":1}')393 # elif input_text == "52": # AGV移动状态-自动模式下关保护盖394 # AGVManager.sendRosCommunicationRequestData(395 # '{"cmd_type":1001,"data":[{"addr":4097,"cmd":5,"data":[3],"num":2}],"seq_cmd":1}')396 #397 # elif input_text == "53": # AGV移动状态-值异常398 # AGVManager.sendRosCommunicationRequestData(399 # '{"cmd_type":1001,"data":[{"addr":4097,"cmd":5,"data":[2],"num":2}],"seq_cmd":1}')400 #401 # elif input_text == "54": # AGV移动状态-值异常402 # AGVManager.sendRosCommunicationRequestData(403 # '{"cmd_type":1001,"data":[{"addr":4097,"cmd":5,"data":[2],"num":2}],"seq_cmd":1}')404 # elif input_text == "54": # 整体回零取消405 # AGVManager.sendRosCommunicationRequestData(406 # '{"cmd_type":1001,"data":[{"addr":4181,"cmd":5,"data":[0],"num":2}],"seq_cmd":1}')407 #408 # elif input_text == "55": # 整体回零409 # AGVManager.sendRosCommunicationRequestData(410 # '{"cmd_type":1001,"data":[{"addr":4181,"cmd":5,"data":[11],"num":2}],"seq_cmd":1}')411 #412 # elif input_text == "56": # 一级升降回零413 # AGVManager.sendRosCommunicationRequestData(414 # '{"cmd_type":1001,"data":[{"addr":4128,"cmd":5,"data":[16],"num":2}],"seq_cmd":1}')415 #416 # elif input_text == "57": # 一级升降回零取消417 # AGVManager.sendRosCommunicationRequestData(418 # '{"cmd_type":1001,"data":[{"addr":4128,"cmd":5,"data":[0],"num":2}],"seq_cmd":1}')419 #420 # elif input_text == "58": # 一级升降设置目标位置421 # AGVManager.sendRosCommunicationRequestData(422 # '{"cmd_type":1001,"data":[{"addr":4129,"cmd":5,"data":[200],"num":2}],"seq_cmd":1}')423 #424 # elif input_text == "59": # 一级升降设置目标速度425 # AGVManager.sendRosCommunicationRequestData(426 # '{"cmd_type":1001,"data":[{"addr":4130,"cmd":5,"data":[30],"num":2}],"seq_cmd":1}')427 #428 # elif input_text == "60": # 一级升降位置模式开启429 # AGVManager.sendRosCommunicationRequestData(430 # '{"cmd_type":1001,"data":[{"addr":4128,"cmd":5,"data":[8],"num":2}],"seq_cmd":1}')431 #432 # elif input_text == "61": # 一级升降模式取消433 # AGVManager.sendRosCommunicationRequestData(434 # '{"cmd_type":1001,"data":[{"addr":4128,"cmd":5,"data":[0],"num":2}],"seq_cmd":1}')435 #436 # elif input_text == "62": # 一级升降点动模式开启437 # AGVManager.sendRosCommunicationRequestData(438 # '{"cmd_type":1001,"data":[{"addr":4128,"cmd":5,"data":[1],"num":2}],"seq_cmd":1}')439 #440 # elif input_text == "63": # 一级升降复位441 # AGVManager.sendRosCommunicationRequestData(442 # '{"cmd_type":1001,"data":[{"addr":4128,"cmd":5,"data":[32],"num":2}],"seq_cmd":1}')443 #444 # elif input_text == "64": # 一级升降复位取消445 # AGVManager.sendRosCommunicationRequestData(446 # '{"cmd_type":1001,"data":[{"addr":4128,"cmd":5,"data":[0],"num":2}],"seq_cmd":1}')447 #448 # elif input_text == "65": # 一级点动模式升起449 # AGVManager.sendRosCommunicationRequestData(450 # '{"cmd_type":1001,"data":[{"addr":4128,"cmd":5,"data":[3],"num":2}],"seq_cmd":1}')451 # time.sleep(1)452 # AGVManager.sendRosCommunicationRequestData(453 # '{"cmd_type":1001,"data":[{"addr":4128,"cmd":5,"data":[0],"num":2}],"seq_cmd":1}')454 #455 # elif input_text == "66": # 一级点动模式下降456 # AGVManager.sendRosCommunicationRequestData(457 # '{"cmd_type":1001,"data":[{"addr":4128,"cmd":5,"data":[5],"num":2}],"seq_cmd":1}')458 # time.sleep(1)459 # AGVManager.sendRosCommunicationRequestData(460 # '{"cmd_type":1001,"data":[{"addr":4128,"cmd":5,"data":[0],"num":2}],"seq_cmd":1}')461 #462 # elif input_text == "67": # 二级升降回零463 # AGVManager.sendRosCommunicationRequestData(464 # '{"cmd_type":1001,"data":[{"addr":4131,"cmd":5,"data":[16],"num":2}],"seq_cmd":1}')465 #466 # elif input_text == "68": # 二级升降回零取消467 # AGVManager.sendRosCommunicationRequestData(468 # '{"cmd_type":1001,"data":[{"addr":4131,"cmd":5,"data":[0],"num":2}],"seq_cmd":1}')469 #470 # elif input_text == "69": # 二级升降目标位置设置471 # AGVManager.sendRosCommunicationRequestData(472 # '{"cmd_type":1001,"data":[{"addr":4132,"cmd":5,"data":[200],"num":2}],"seq_cmd":1}')473 #474 # elif input_text == "70": # 二级升降目标速度设置475 # AGVManager.sendRosCommunicationRequestData(476 # '{"cmd_type":1001,"data":[{"addr":4133,"cmd":5,"data":[30],"num":2}],"seq_cmd":1}')477 #478 # elif input_text == "71": # 二级升降位置模式开启479 # AGVManager.sendRosCommunicationRequestData(480 # '{"cmd_type":1001,"data":[{"addr":4131,"cmd":5,"data":[8],"num":2}],"seq_cmd":1}')481 #482 # elif input_text == "72": # 二级升降位置模式取消483 # AGVManager.sendRosCommunicationRequestData(484 # '{"cmd_type":1001,"data":[{"addr":4131,"cmd":5,"data":[0],"num":2}],"seq_cmd":1}')485 #486 # elif input_text == "73": # 二级升降复位487 # AGVManager.sendRosCommunicationRequestData(488 # '{"cmd_type":1001,"data":[{"addr":4131,"cmd":5,"data":[32],"num":2}],"seq_cmd":1}')489 #490 # elif input_text == "74": # 二级升降复位取消491 # AGVManager.sendRosCommunicationRequestData(492 # '{"cmd_type":1001,"data":[{"addr":4131,"cmd":5,"data":[0],"num":2}],"seq_cmd":1}')493 #494 # elif input_text == "75": # 二级升降点动模式开启495 # AGVManager.sendRosCommunicationRequestData(496 # '{"cmd_type":1001,"data":[{"addr":4131,"cmd":5,"data":[1],"num":2}],"seq_cmd":1}')497 #498 # elif input_text == "76": # 二级升降点动模式取消499 # AGVManager.sendRosCommunicationRequestData(500 # '{"cmd_type":1001,"data":[{"addr":4131,"cmd":5,"data":[0],"num":2}],"seq_cmd":1}')501 #502 # elif input_text == "77": # 二级升降点动模式升起503 # AGVManager.sendRosCommunicationRequestData(504 # '{"cmd_type":1001,"data":[{"addr":4131,"cmd":5,"data":[3],"num":2}],"seq_cmd":1}')505 # time.sleep(1)506 # AGVManager.sendRosCommunicationRequestData(507 # '{"cmd_type":1001,"data":[{"addr":4131,"cmd":5,"data":[0],"num":2}],"seq_cmd":1}')508 #509 # elif input_text == "78": # 二级升降点动模式下降510 # AGVManager.sendRosCommunicationRequestData(511 # '{"cmd_type":1001,"data":[{"addr":4131,"cmd":5,"data":[5],"num":2}],"seq_cmd":1}')512 # time.sleep(1)513 # AGVManager.sendRosCommunicationRequestData(514 # '{"cmd_type":1001,"data":[{"addr":4131,"cmd":5,"data":[0],"num":2}],"seq_cmd":1}')515 #516 # elif input_text == "79": # 上装通用方法517 # addr = input("请输入寄存器地址:")518 # add = int(addr, 16)519 # data = input("请输入data值:")520 # try:521 # data = int(data)522 # except:523 # data = float(data)524 # # print(data)525 # start = input("是否开启监听(y/n):")526 # if start == 'y':527 # setattr(Res, 'start_listener', True)528 # time.sleep(1)529 # data1 = getattr(Res, 'data1')530 # data3 = getattr(Res, 'data2')531 # setattr(Res, 'data1', None)532 # setattr(Res, 'data2', None)533 # print("TX2写入数据:{}".format(data1))534 # print("PLC写入数据:{}".format(data3))535 # print('....................')536 # json_data = '{"cmd_type":1001,"data":[{"addr":%s,"cmd":5,"data":[%s],"num":2}],"seq_cmd":1}' % (add, data)537 # AGVManager.sendRosCommunicationRequestData(json_data)538 # time.sleep(1)539 # if start == 'y':540 # setattr(Res, 'start_listener', True)541 # time.sleep(2)542 # data2 = getattr(Res, 'data1')543 # data4 = getattr(Res, 'data2')544 # print("TX2写入数据:{}".format(data2))545 # print("PLC写入数据:{}".format(data4))546 # # print(data2[85], data1[85])547 # # print(data3==data4)548 #549 # for i in range(len(data2)):550 # # print("i{}:{}-----{}".format(i, data1[i], data2[i]))551 # # if i != data1[data2.index(i)] and data2.index(i) > 0:552 # if data1[i] != data2[i]:553 # print("TX2写入内存地址变化位置为:{},原值:{},现值:{}".format(i, data1[i], data2[i]))554 # print("---------")555 # for i in range(len(data4)):556 # # print("i:{}-----{}".format(i, data1[data2.index(i)]))557 # # if i != data3[data4.index(i)] and data4.index(i) not in [16, 17]:558 # if data3[i] != data4[i]:559 # print("PLC写入内存地址变化位置为:{},原值:{},现值:{}".format(i, data3[i], data4[i]))560 # setattr(Res, 'start_listener', False)561 # setattr(Res, 'data1', None)562 # setattr(Res, 'data2', None)563 #564 #565 # elif input_text == "94": # 路径下发566 # mapData = str(readJson('map_0710.json'))567 # msg = AGVManager.sendRosCommunicationRequestData('{"cmd_type":2000,"data":[' + mapData + '],"seq_cmd":1}')568 # print(msg)569 #570 # elif input_text == '95': # 登录接口压力测试571 # pressure_request(60, 2, (methodcaller('loginRobot', 'bzl_user2', 'ss1234567'),), AGVManager)572 # print('登录接口压力测试')573 #574 # elif input_text == '96': # 获取地图列表接口压力测试575 # pressure_request(60, 50, (methodcaller('reqMapList'),), AGVManager)576 # print('获取地图列表接口压力测试')577 #578 # elif input_text == '97': # 获取激光点云数据接口压力测试579 # pressure_request(60, 50, (methodcaller('reqLaserConfigAndData'),), AGVManager)580 # print('获取激光点云数据接口压力测试')581 #582 # elif input_text == '98': # 获取机器的信息接口压力测试583 # pressure_request(60, 50, (methodcaller('reqControllerServerInfo'),), AGVManager)584 # print('获取机器的信息接口压力测试')585 #586 # elif input_text == '99': # 获取机器的状态信息接口压力测试587 # pressure_request(60, 50, (methodcaller('reqAllFrequentQuery'),), AGVManager)588 # print('获取机器的状态信息接口压力测试')589 #590 # elif input_text == '100': # 加载地图接口压力测试591 # pressure_request(60, 10, (methodcaller('loadMapByRoomBean', 'yyy', MapType.MAP_TYPE_2D),), AGVManager)592 # print('加载地图接口压力测试')593 #594 # elif input_text == '997':595 # # 导出历史故障596 # e = ExportErrorCodeHistoryParam()597 # e.setEnd_time(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))598 # jl = java.util.ArrayList()599 # # 错误码范围E01000到E06017600 # # a = input("输入起始故障码范围")601 # # b = input("输入结束故障码范围")602 # a = 'E01000'603 # b = 'E06000'604 # jl.add(a)605 # jl.add(b)606 # e.setError_code_range(jl)607 # e.setStart_time('2020-09-01 12:12:12')608 # AGVManager.exportFaultHistory(e)609 #610 #611 # elif input_text == '998':612 # # 分页查询机器历史故障613 # g = GetErrorCodeHistoryParam()614 # g.setEnd_time(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))615 # jl = java.util.ArrayList()616 # # 错误码范围E01000到E06017617 # a = input("输入起始故障码范围")618 # b = input("输入结束故障码范围")619 # jl.add(a)620 # jl.add(b)621 # g.setError_code_range(jl)622 # g.setPage_count(5)623 # g.setPage_num(1)624 # g.setStart_time('2020-09-01 12:12:12')625 # AGVManager.getFaultHistoryList(g)626 # print(123)627 #628 # elif input_text == '999':629 # # 开始启动委外测试630 # s = StartTestParam()631 # s.setAngular_speed(0)632 # s.setLine_speed(0.3)633 # s.setMove_duration(1)634 # list = [TestAction.TEST_ACTION_GO_AHEAD, TestAction.TEST_ACTION_GO_BACK]635 # jl = java.util.ArrayList()636 # for i in list:637 # jl.add(i)638 # s.setMove_mode_queue(jl)639 # s.setRotate_duration(0.1)640 # s.setStop_duration(0.1)641 # AGVManager.startFunctionTest(s)642 #643 # elif input_text == "120": # 发送心跳压力测试644 # send_heart(60, AGVManager)645 #646 # elif input_text == '1000': # 新增账号信息647 # u = UserBean()648 # name = input('输入账号名:')649 # u.setUserName(name)650 # nick_name = input('输入用户名:')651 # u.setNickName(nick_name)652 # pwd = input('输入密码:')653 # u.setPassword(pwd)654 # role_num = input('选择角色:1.管理员 2.监测员 3.操作员')655 # role_dict = {'1': u.ROLE_ADMIN, '2': u.ROLE_BUILDER, '3': u.ROLE_OPERATOR}656 # role = role_dict[role_num]657 # u.setRole(role)658 # c_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())659 # u.setCreateTime(c_time)660 # print(u)661 # AGVManager.addNewUser(u)662 #663 # elif input_text == '1001': # 获取用户列表664 # AGVManager.getUserList()665 #666 # elif input_text == '1002':667 # name = input('输入要删除的账号')668 # AGVManager.deleteUser(name)669 #670 # elif input_text == '1003': # 编辑账号信息671 # u = UserBean()672 # name = input('输入账号名:')673 # u.setUserName(name)674 # nick_name = input('输入用户名:')675 # u.setNickName(nick_name)676 # pwd = input('输入密码:')677 # u.setPassword(pwd)678 # role_num = input('选择角色:1.管理员 2.监测员 3.操作员')679 # role_dict = {'1': u.ROLE_ADMIN, '2': u.ROLE_BUILDER, '3': u.ROLE_OPERATOR}680 # role = role_dict[role_num]681 # u.setRole(role)682 # c_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())683 # u.setCreateTime(c_time)684 # print(u)685 # AGVManager.editUserInfo(u)686 #687 # elif input_text == "1004": # 以指定速度朝正方向移动X688 # AGVManager.sendRosCommunicationRequestData(689 # '{"cmd_type":1001,"data":[{"addr":8211,"cmd":5,"data":[10],"num":2}],"seq_cmd":1}')690 #691 # elif input_text == '1005':692 # data = {693 # "cmd_type": 1000,694 # "data": {695 # "req_cmd_type": 1008,696 # "result": 0697 # },698 # "seq_cmd": 1,699 # "version": "1.0"700 # }701 #702 # setattr(Res, 'response_Data', data)703 # print(getattr(Res, 'response_Data'))704 # AGVManager.turnAutoMode()705 # time.sleep(1)706 # print(getattr(Res, 'response'))707 #708 # setattr(Res, 'response', False)709 # data1 = {710 # "cmd_type": 1000,711 # "data": {712 # "req_cmd_type": 1009,713 # "result": 0714 # },715 # "seq_cmd": 2,716 # "version": "1.0"717 # }718 # setattr(Res, 'response_Data', data1)719 # AGVManager.turnManualMode()720 # time.sleep(1)721 # print(getattr(Res, 'response'))722 elif input_text == "90001": # 参数设置数据获取723 setattr(Res, 'start_listener', True)724 time.sleep(1)725 AGVManager.sendRosCommunicationRequestData('{"cmd_type":9000, "seq_cmd":9000, "data":{}}')726 time.sleep(1)727 assert_fun(0)728 elif input_text == "90002": # 参数设置数据获取-cmd_type为字符串729 setattr(Res, 'start_listener', True)730 time.sleep(1)731 AGVManager.sendRosCommunicationRequestData('{"cmd_type":"9000", "seq_cmd":9000, "data":{}}')732 time.sleep(1)733 assert_fun(1)734 elif input_text == "90003": # 参数设置数据获取-cmd_type浮点数735 setattr(Res, 'start_listener', True)736 time.sleep(1)737 AGVManager.sendRosCommunicationRequestData('{"cmd_type":9000.0, "seq_cmd":9000, "data":{}}')738 time.sleep(1)739 assert_fun(1)740 elif input_text == "90004": # 参数设置数据获取-cmd_type为None741 setattr(Res, 'start_listener', True)742 time.sleep(1)743 AGVManager.sendRosCommunicationRequestData('{"cmd_type":None, "seq_cmd":9000, "data":{}}')744 time.sleep(1)745 assert_fun(1)746 elif input_text == "90005": # 参数设置数据获取-cmd_type为""747 setattr(Res, 'start_listener', True)748 time.sleep(1)749 AGVManager.sendRosCommunicationRequestData('{"cmd_type":"", "seq_cmd":9000, "data":{}}')750 time.sleep(1)751 assert_fun(1)752 elif input_text == "90006": # 参数设置数据获取-cmd_type参数缺失753 setattr(Res, 'start_listener', True)754 time.sleep(1)755 AGVManager.sendRosCommunicationRequestData('{"seq_cmd":9000, "data":{}}')756 time.sleep(1)757 assert_fun(1)758 elif input_text == "90007": # 参数设置数据获取-seq_cmd为字符创759 setattr(Res, 'start_listener', True)760 time.sleep(1)761 AGVManager.sendRosCommunicationRequestData('{"cmd_type":9000, "seq_cmd":"9000", "data":{}}')762 time.sleep(1)763 assert_fun(1)764 elif input_text == "90008": # 参数设置数据获取-seq_cmd为字符串765 setattr(Res, 'start_listener', True)766 time.sleep(1)767 AGVManager.sendRosCommunicationRequestData('{"cmd_type":9000, "seq_cmd":"9000", "data":{}}')768 time.sleep(1)769 assert_fun(1)770 elif input_text == "90009": # 参数设置数据获取-seq_cmd为""771 setattr(Res, 'start_listener', True)772 time.sleep(1)773 AGVManager.sendRosCommunicationRequestData('{"cmd_type":9000, "seq_cmd":"", "data":{}}')774 time.sleep(1)775 assert_fun(1)776 elif input_text == "900010": # 参数设置数据获取-seq_cmd为None777 setattr(Res, 'start_listener', True)778 time.sleep(1)779 AGVManager.sendRosCommunicationRequestData('{"cmd_type":9000, "seq_cmd":None, "data":{}}')780 time.sleep(1)781 assert_fun(1)782 elif input_text == "900011": # 参数设置数据获取-seq_cmd缺失783 setattr(Res, 'start_listener', True)784 time.sleep(1)785 AGVManager.sendRosCommunicationRequestData('{"cmd_type":9000, "data":{}}')786 time.sleep(1)787 assert_fun(1)788 elif input_text == "900012": # 参数设置数据获取-data为字符串789 setattr(Res, 'start_listener', True)790 time.sleep(1)791 AGVManager.sendRosCommunicationRequestData('{"cmd_type":9000, "seq_cmd":9000, "data":"test"')792 time.sleep(1)793 assert_fun(1)794 elif input_text == "900013": # 参数设置数据获取-data为""795 setattr(Res, 'start_listener', True)796 time.sleep(1)797 AGVManager.sendRosCommunicationRequestData('{"cmd_type":9000, "seq_cmd":9000, "data":""')798 time.sleep(1)799 assert_fun(1)800 elif input_text == "900014": # 参数设置数据获取-data为None801 setattr(Res, 'start_listener', True)802 time.sleep(1)803 AGVManager.sendRosCommunicationRequestData('{"cmd_type":9000, "seq_cmd":9000, "data":None')804 time.sleep(1)805 assert_fun(1)806 elif input_text == "900015": # 参数设置数据获取-data为缺失807 setattr(Res, 'start_listener', True)808 time.sleep(1)809 AGVManager.sendRosCommunicationRequestData('{"cmd_type":9000, "seq_cmd":9000')810 time.sleep(1)811 assert_fun(1)812 elif input_text == "100001": # 上装模式切换为作业模式813 setattr(Res, 'start_listener', True)814 time.sleep(1)815 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10001, "seq_cmd":10001, "data":{}}') # 先切回普通模式816 time.sleep(1)817 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10000, "seq_cmd":10000, "data":{}}')818 time.sleep(1)819 assert_fun(0)820 elif input_text == "100002": # 上装作业模式切换为作业模式821 setattr(Res, 'start_listener', True)822 time.sleep(1)823 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10000, "seq_cmd":10000, "data":{}}')824 time.sleep(1)825 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10000, "seq_cmd":10000, "data":{}}')826 time.sleep(1)827 assert_fun(0)828 elif input_text == "100011": # 上装作业模式切换为普通模式829 setattr(Res, 'start_listener', True)830 time.sleep(1)831 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10000, "seq_cmd":10000, "data":{}}') # 先切回作业模式832 time.sleep(1)833 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10001, "seq_cmd":10001, "data":{}}')834 time.sleep(1)835 assert_fun(0)836 elif input_text == "100012": # 上装普通模式切换为普通模式837 setattr(Res, 'start_listener', True)838 time.sleep(1)839 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10001, "seq_cmd":10001, "data":{}}')840 time.sleep(1)841 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10001, "seq_cmd":10001, "data":{}}')842 time.sleep(1)843 assert_fun(0)844 elif input_text == "100031": # 作业模式 机器开始作业845 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10000, "seq_cmd":10000, "data":{}}') # 作业模式846 setattr(Res, 'start_listener', True)847 time.sleep(1)848 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10003, "seq_cmd":10003, "data":{}}') # 开始作业849 time.sleep(1)850 assert_fun(0)851 setattr(Res, 'start_listener', True)852 time.sleep(10)853 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10004, "seq_cmd":10004, "data":{}}') # 停止作业854 time.sleep(1)855 assert_fun(0)856 elif input_text == "100032": # 作业模式 开始振捣857 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10000, "seq_cmd":10000, "data":{}}') # 作业模式858 setattr(Res, 'start_listener', True)859 time.sleep(1)860 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10005, "seq_cmd":10005, "data":{}}') # 开始振捣861 time.sleep(1)862 assert_fun(0)863 elif input_text == "100033": # 作业模式 开始作业 停止振捣864 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10000, "seq_cmd":10000, "data":{}}') # 作业模式865 setattr(Res, 'start_listener', True)866 time.sleep(1)867 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10005, "seq_cmd":10005, "data":{}}') # 开始作业868 time.sleep(1)869 assert_fun(0)870 setattr(Res, 'start_listener', True)871 time.sleep(10)872 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10004, "seq_cmd":10004, "data":{}}') # 停止作业873 time.sleep(1)874 assert_fun(0)875 setattr(Res, 'start_listener', True)876 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10006, "seq_cmd":10006, "data":{}}') # 停止振捣877 time.sleep(1)878 assert_fun(1)879 elif input_text == "100034": # 作业模式 停止作业 不是用例880 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10004, "seq_cmd":10004, "data":{}}') # 停止作业881 elif input_text == "100035": # 作业模式 激光自动关闭882 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10000, "seq_cmd":10000, "data":{}}') # 作业模式883 setattr(Res, 'start_listener', True)884 time.sleep(1)885 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10004, "seq_cmd":10004, "data":{}}') # 激光自动关闭886 time.sleep(1)887 assert_fun(0)888 elif input_text == "100036": # 作业模式 左推杆上889 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10000, "seq_cmd":10000, "data":{}}') # 作业模式890 setattr(Res, 'start_listener', True)891 time.sleep(1)892 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10009, "seq_cmd":10009, "data":{}}') # 左推杆上893 time.sleep(1)894 assert_fun(1)895 elif input_text == "100037": # 作业模式 左推杆下896 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10000, "seq_cmd":10000, "data":{}}') # 作业模式897 setattr(Res, 'start_listener', True)898 time.sleep(1)899 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10010, "seq_cmd":10010, "data":{}}') # 左推杆下900 time.sleep(1)901 assert_fun(1)902 elif input_text == "100038": # 作业模式 右推杆上903 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10000, "seq_cmd":10000, "data":{}}') # 作业模式904 setattr(Res, 'start_listener', True)905 time.sleep(1)906 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10011, "seq_cmd":10011, "data":{}}') # 右推杆上907 time.sleep(1)908 assert_fun(1)909 elif input_text == "100039": # 作业模式 右推杆下910 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10000, "seq_cmd":10000, "data":{}}') # 作业模式911 setattr(Res, 'start_listener', True)912 time.sleep(1)913 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10012, "seq_cmd":10012, "data":{}}') # 右推杆下914 time.sleep(1)915 assert_fun(1)916 elif input_text == "1000310": # 作业模式 左右推杆上917 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10000, "seq_cmd":10000, "data":{}}') # 作业模式918 setattr(Res, 'start_listener', True)919 time.sleep(1)920 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10013, "seq_cmd":10013, "data":{}}') # 右推杆上921 time.sleep(1)922 assert_fun(1)923 elif input_text == "1000311": # 作业模式 左右推杆下924 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10000, "seq_cmd":10000, "data":{}}') # 作业模式925 setattr(Res, 'start_listener', True)926 time.sleep(1)927 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10014, "seq_cmd":10014, "data":{}}') # 右推杆下928 time.sleep(1)929 assert_fun(1)930 elif input_text == "1000312": # 作业模式 振捣开启931 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10000, "seq_cmd":10000, "data":{}}') # 作业模式932 setattr(Res, 'start_listener', True)933 time.sleep(1)934 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10017, "seq_cmd":10017, "data":{}}') # 振捣开启935 time.sleep(10)936 assert_fun(1)937 elif input_text == "1000313": # 作业模式-开始作业-振捣关闭938 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10000, "seq_cmd":10000, "data":{}}') # 作业模式939 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10003, "seq_cmd":10003, "data":{}}') # 开始作业940 time.sleep(10)941 setattr(Res, 'start_listener', True)942 time.sleep(1)943 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10018, "seq_cmd":10018, "data":{}}') # 振捣关闭944 time.sleep(1)945 assert_fun(1)946 elif input_text == "100032": # 普通模式 机器开始作业947 setattr(Res, 'start_listener', True)948 time.sleep(1)949 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10001, "seq_cmd":10001, "data":{}}') # 普通模式950 time.sleep(1)951 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10003, "seq_cmd":10003, "data":{}}') # 开始作业952 time.sleep(1)953 assert_fun(1)954 # setattr(Res, 'start_listener', True)955 # time.sleep(10)956 # AGVManager.sendRosCommunicationRequestData('{"cmd_type":10004, "seq_cmd":10004, "data":{}}') # 停止作业957 # time.sleep(1)958 # assert_fun(1)959 elif input_text == "100032": # 作业过程中切换为普通模式960 setattr(Res, 'start_listener', True)961 time.sleep(1)962 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10000, "seq_cmd":10000, "data":{}}') # 作业模式963 time.sleep(1)964 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10003, "seq_cmd":10003, "data":{}}') # 开始作业965 time.sleep(1)966 assert_fun(1)967 setattr(Res, 'start_listener', True)968 time.sleep(10)969 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10001, "seq_cmd":10001, "data":{}}') # 普通模式970 time.sleep(1)971 assert_fun(1)972 elif input_text == "100033": # 停止作业 # 不是用例973 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10004, "seq_cmd":10004, "data":{}}')974 elif input_text == "100051": # 普通模式 开始振捣 停止振捣975 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10001, "seq_cmd":10001, "data":{}}') # 切换为普通模式976 setattr(Res, 'start_listener', True)977 time.sleep(1)978 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10005, "seq_cmd":10005, "data":{}}') # 开始振捣979 time.sleep(10)980 assert_fun(0)981 setattr(Res, 'start_listener', True)982 time.sleep(1)983 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10006, "seq_cmd":10006, "data":{}}') # 停止振捣984 time.sleep(1)985 assert_fun(0)986 elif input_text == "100052": # 振捣时改为作业模式987 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10001, "seq_cmd":10001, "data":{}}') # 切换为普通模式988 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10005, "seq_cmd":10005, "data":{}}') # 开始振捣989 time.sleep(10)990 setattr(Res, 'start_listener', True)991 time.sleep(1)992 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10000, "seq_cmd":10000, "data":{}}') # 作业模式993 time.sleep(1)994 assert_fun(0)995 elif input_text == "100053": # 停止振捣 # 不是用例996 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10004, "seq_cmd":10004, "data":{}}')997 elif input_text == "100070": # 普通模式 激光自动开启 关闭998 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10001, "seq_cmd":10001, "data":{}}') # 切换为普通模式999 setattr(Res, 'start_listener', True)1000 time.sleep(1)1001 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10007, "seq_cmd":10007, "data":{}}') # 激光自动开启1002 time.sleep(5)1003 assert_fun(0)1004 setattr(Res, 'start_listener', True)1005 time.sleep(1)1006 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10008, "seq_cmd":10008, "data":{}}') # 激光自动关闭1007 time.sleep(10)1008 assert_fun(1)1009 elif input_text == "100081": # 激光自动关闭 左推杆上1010 setattr(Res, 'start_listener', True)1011 time.sleep(1)1012 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10008, "seq_cmd":10008, "data":{}}') # 激光自动关闭1013 time.sleep(2)1014 assert_fun(0)1015 setattr(Res, 'start_listener', True)1016 time.sleep(1)1017 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10009, "seq_cmd":10009, "data":{}}') # 左推杆上1018 time.sleep(0.5)1019 assert_fun(0)1020 setattr(Res, 'start_listener', True)1021 AGVManager.sendRosCommunicationRequestData(1022 '{"cmd_type":10100,"data":{"cancel_cmd":10009},"seq_cmd":1,"version":0}') # 取消上装1023 time.sleep(0.5)1024 assert_fun(0)1025 elif input_text == "100082": # 激光自动关闭 左推杆下1026 setattr(Res, 'start_listener', True)1027 time.sleep(1)1028 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10008, "seq_cmd":10008, "data":{}}') # 激光自动关闭1029 time.sleep(2)1030 assert_fun(0)1031 setattr(Res, 'start_listener', True)1032 time.sleep(1)1033 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10010, "seq_cmd":10010, "data":{}}') # 左推杆下1034 time.sleep(0.5)1035 assert_fun(0)1036 setattr(Res, 'start_listener', True)1037 AGVManager.sendRosCommunicationRequestData(1038 '{"cmd_type":10100,"data":{"cancel_cmd":10010},"seq_cmd":1,"version":0}') # 取消上装1039 time.sleep(0.5)1040 assert_fun(0)1041 elif input_text == "100083": # 激光自动关闭 右推杆上1042 setattr(Res, 'start_listener', True)1043 time.sleep(1)1044 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10008, "seq_cmd":10008, "data":{}}') # 激光自动关闭1045 time.sleep(2)1046 assert_fun(0)1047 setattr(Res, 'start_listener', True)1048 time.sleep(1)1049 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10011, "seq_cmd":10011, "data":{}}') # 右推杆上1050 time.sleep(0.5)1051 assert_fun(0)1052 setattr(Res, 'start_listener', True)1053 AGVManager.sendRosCommunicationRequestData(1054 '{"cmd_type":10100,"data":{"cancel_cmd":10011},"seq_cmd":1,"version":0}') # 取消上装1055 time.sleep(0.5)1056 assert_fun(0)1057 elif input_text == "100084": # 激光自动关闭 右推杆下1058 setattr(Res, 'start_listener', True)1059 time.sleep(1)1060 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10008, "seq_cmd":10008, "data":{}}') # 激光自动关闭1061 time.sleep(2)1062 assert_fun(0)1063 setattr(Res, 'start_listener', True)1064 time.sleep(1)1065 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10012, "seq_cmd":10012, "data":{}}') # 右推杆下1066 time.sleep(0.5)1067 assert_fun(0)1068 setattr(Res, 'start_listener', True)1069 AGVManager.sendRosCommunicationRequestData(1070 '{"cmd_type":10100,"data":{"cancel_cmd":10012},"seq_cmd":1,"version":0}') # 取消上装1071 time.sleep(0.5)1072 assert_fun(0)1073 elif input_text == "100085": # 激光自动关闭 左右推杆上1074 setattr(Res, 'start_listener', True)1075 time.sleep(1)1076 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10008, "seq_cmd":10008, "data":{}}') # 激光自动关闭1077 time.sleep(2)1078 assert_fun(0)1079 setattr(Res, 'start_listener', True)1080 time.sleep(1)1081 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10013, "seq_cmd":10013, "data":{}}') # 右推杆下1082 time.sleep(0.5)1083 assert_fun(0)1084 setattr(Res, 'start_listener', True)1085 AGVManager.sendRosCommunicationRequestData(1086 '{"cmd_type":10100,"data":{"cancel_cmd":10013},"seq_cmd":1,"version":0}') # 取消上装1087 time.sleep(0.5)1088 assert_fun(0)1089 elif input_text == "100086": # 激光自动关闭 左右推杆下1090 setattr(Res, 'start_listener', True)1091 time.sleep(1)1092 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10008, "seq_cmd":10008, "data":{}}') # 激光自动关闭1093 time.sleep(2)1094 assert_fun(0)1095 setattr(Res, 'start_listener', True)1096 time.sleep(1)1097 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10014, "seq_cmd":10014, "data":{}}') # 左右推杆下1098 time.sleep(0.5)1099 assert_fun(0)1100 setattr(Res, 'start_listener', True)1101 AGVManager.sendRosCommunicationRequestData(1102 '{"cmd_type":10100,"data":{"cancel_cmd":10014},"seq_cmd":1,"version":0}') # 取消上装1103 time.sleep(0.5)1104 assert_fun(0)1105 elif input_text == "100071": # 激光自动开启 左推杆上1106 setattr(Res, 'start_listener', True)1107 time.sleep(1)1108 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10007, "seq_cmd":10007, "data":{}}') # 激光自动开启1109 time.sleep(2)1110 assert_fun(0)1111 setattr(Res, 'start_listener', True)1112 time.sleep(1)1113 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10009, "seq_cmd":10009, "data":{}}') # 左推杆上1114 time.sleep(0.5)1115 assert_fun(1)1116 setattr(Res, 'start_listener', True)1117 AGVManager.sendRosCommunicationRequestData(1118 '{"cmd_type":10100,"data":{"cancel_cmd":10009},"seq_cmd":1,"version":0}') # 取消上装1119 time.sleep(0.5)1120 assert_fun(0)1121 elif input_text == "100072": # 激光自动开启 左推杆下1122 setattr(Res, 'start_listener', True)1123 time.sleep(1)1124 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10007, "seq_cmd":10007, "data":{}}') # 激光自动开启1125 time.sleep(2)1126 assert_fun(0)1127 setattr(Res, 'start_listener', True)1128 time.sleep(1)1129 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10010, "seq_cmd":10010, "data":{}}') # 左推杆下1130 time.sleep(0.5)1131 assert_fun(1)1132 setattr(Res, 'start_listener', True)1133 AGVManager.sendRosCommunicationRequestData(1134 '{"cmd_type":10100,"data":{"cancel_cmd":10010},"seq_cmd":1,"version":0}') # 取消上装1135 time.sleep(0.5)1136 assert_fun(0)1137 elif input_text == "100073": # 激光自动开启 右推杆上1138 setattr(Res, 'start_listener', True)1139 time.sleep(1)1140 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10007, "seq_cmd":10007, "data":{}}') # 激光自动开启1141 time.sleep(2)1142 assert_fun(0)1143 setattr(Res, 'start_listener', True)1144 time.sleep(1)1145 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10011, "seq_cmd":10011, "data":{}}') # 右推杆上1146 time.sleep(0.5)1147 assert_fun(1)1148 setattr(Res, 'start_listener', True)1149 AGVManager.sendRosCommunicationRequestData(1150 '{"cmd_type":10100,"data":{"cancel_cmd":10011},"seq_cmd":1,"version":0}') # 取消上装1151 time.sleep(0.5)1152 assert_fun(0)1153 elif input_text == "100074": # 激光自动开启 右推杆下1154 setattr(Res, 'start_listener', True)1155 time.sleep(1)1156 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10007, "seq_cmd":10007, "data":{}}') # 激光自动开启1157 time.sleep(2)1158 assert_fun(0)1159 setattr(Res, 'start_listener', True)1160 time.sleep(1)1161 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10012, "seq_cmd":10012, "data":{}}') # 右推杆下1162 time.sleep(0.5)1163 assert_fun(1)1164 setattr(Res, 'start_listener', True)1165 AGVManager.sendRosCommunicationRequestData(1166 '{"cmd_type":10100,"data":{"cancel_cmd":10012},"seq_cmd":1,"version":0}') # 取消上装1167 time.sleep(0.5)1168 assert_fun(0)1169 elif input_text == "100075": # 激光自动开启 左右推杆上1170 setattr(Res, 'start_listener', True)1171 time.sleep(1)1172 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10007, "seq_cmd":10007, "data":{}}') # 激光自动开启1173 time.sleep(2)1174 assert_fun(0)1175 setattr(Res, 'start_listener', True)1176 time.sleep(1)1177 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10013, "seq_cmd":10013, "data":{}}') # 右推杆下1178 time.sleep(0.5)1179 assert_fun(0)1180 setattr(Res, 'start_listener', True)1181 AGVManager.sendRosCommunicationRequestData(1182 '{"cmd_type":10100,"data":{"cancel_cmd":10013},"seq_cmd":1,"version":0}') # 取消上装1183 time.sleep(0.5)1184 assert_fun(0)1185 elif input_text == "100076": # 激光自动开启 左右推杆下1186 setattr(Res, 'start_listener', True)1187 time.sleep(1)1188 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10007, "seq_cmd":10007, "data":{}}') # 激光自动开启1189 time.sleep(2)1190 assert_fun(0)1191 setattr(Res, 'start_listener', True)1192 time.sleep(1)1193 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10014, "seq_cmd":10014, "data":{}}') # 左右推杆下1194 time.sleep(0.5)1195 assert_fun(0)1196 setattr(Res, 'start_listener', True)1197 AGVManager.sendRosCommunicationRequestData(1198 '{"cmd_type":10100,"data":{"cancel_cmd":10014},"seq_cmd":1,"version":0}') # 取消上装1199 time.sleep(0.5)1200 assert_fun(0)1201 elif input_text == "100171": # 振捣开启 确认在什么模式下开启1202 setattr(Res, 'start_listener', True)1203 time.sleep(1)1204 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10017, "seq_cmd":10017, "data":{}}') # 振捣开启1205 time.sleep(10)1206 assert_fun(0)1207 setattr(Res, 'start_listener', True)1208 time.sleep(1)1209 AGVManager.sendRosCommunicationRequestData('{"cmd_type":10018, "seq_cmd":10018, "data":{}}') # 振捣关闭1210 time.sleep(0.5)1211 assert_fun(0)1212 elif input_text == "90011": # 振捣频率设置-数值在区间内1213 setattr(Res, 'start_listener', True)1214 time.sleep(1)1215 AGVManager.sendRosCommunicationRequestData(1216 '{"cmd_type":9001,"data":{"value":1000},"seq_cmd":1,"version":0}')1217 time.sleep(1)1218 assert_fun(0)1219 elif input_text == "90012": # 振捣频率设置-数值等于最小值1220 setattr(Res, 'start_listener', True)1221 time.sleep(1)1222 AGVManager.sendRosCommunicationRequestData(1223 '{"cmd_type":9001,"data":{"value":1000},"seq_cmd":1,"version":0}')1224 time.sleep(1)1225 assert_fun(0)1226 elif input_text == "90013": # 振捣频率设置-数值等于最大值1227 setattr(Res, 'start_listener', True)1228 time.sleep(1)1229 AGVManager.sendRosCommunicationRequestData(1230 '{"cmd_type":9001,"data":{"value":1000},"seq_cmd":1,"version":0}')1231 time.sleep(1)1232 assert_fun(0)1233 elif input_text == "90014": # 振捣频率设置-数值小于最小值1234 setattr(Res, 'start_listener', True)1235 time.sleep(1)1236 AGVManager.sendRosCommunicationRequestData(1237 '{"cmd_type":9001,"data":{"value":1000},"seq_cmd":1,"version":0}')1238 time.sleep(1)1239 assert_fun(1)1240 elif input_text == "90015": # 振捣频率设置-数值大于最大值1241 setattr(Res, 'start_listener', True)1242 time.sleep(1)1243 AGVManager.sendRosCommunicationRequestData(1244 '{"cmd_type":9001,"data":{"value":1000},"seq_cmd":1,"version":0}')1245 time.sleep(1)1246 assert_fun(1)1247 elif input_text == "90016": # 振捣频率设置-数值为字符串1248 setattr(Res, 'start_listener', True)1249 time.sleep(1)1250 AGVManager.sendRosCommunicationRequestData(1251 '{"cmd_type":9001,"data":{"value":"1000"},"seq_cmd":1,"version":0}')1252 time.sleep(1)1253 assert_fun(1)1254 elif input_text == "90017": # 振捣频率设置-数值为浮点数1255 setattr(Res, 'start_listener', True)1256 time.sleep(1)1257 AGVManager.sendRosCommunicationRequestData(1258 '{"cmd_type":9001,"data":{"value":1000},"seq_cmd":1,"version":0}')1259 time.sleep(1)1260 assert_fun(1)1261 elif input_text == "90018": # 振捣频率设置-数值为""1262 setattr(Res, 'start_listener', True)1263 time.sleep(1)1264 AGVManager.sendRosCommunicationRequestData(1265 '{"cmd_type":9001,"data":{"value":1000},"seq_cmd":1,"version":0}')1266 time.sleep(1)1267 assert_fun(1)1268 elif input_text == "90019": # 振捣频率设置-数值为None1269 setattr(Res, 'start_listener', True)1270 time.sleep(1)1271 AGVManager.sendRosCommunicationRequestData(1272 '{"cmd_type":9001,"data":{"value":None},"seq_cmd":1,"version":0}')1273 time.sleep(1)1274 assert_fun(1)1275 elif input_text == "900110": # 振捣频率设置-数值缺失1276 setattr(Res, 'start_listener', True)1277 time.sleep(1)1278 AGVManager.sendRosCommunicationRequestData(1279 '{"cmd_type":9001,"data":{"value":None},"seq_cmd":1,"version":0}')1280 time.sleep(1)1281 assert_fun(1)1282 elif input_text == "90021": # 水平高度设置-数值在区间内1283 setattr(Res, 'start_listener', True)1284 time.sleep(1)1285 AGVManager.sendRosCommunicationRequestData(1286 '{"cmd_type":9002,"data":{"value":1000},"seq_cmd":1,"version":0}')1287 time.sleep(1)1288 assert_fun(0)1289 elif input_text == "90022": # 水平高度设置-数值等于最小值1290 setattr(Res, 'start_listener', True)1291 time.sleep(1)1292 AGVManager.sendRosCommunicationRequestData(1293 '{"cmd_type":9002,"data":{"value":1000},"seq_cmd":1,"version":0}')1294 time.sleep(1)1295 assert_fun(0)1296 elif input_text == "90023": # 水平高度设置-数值等于最大值1297 setattr(Res, 'start_listener', True)1298 time.sleep(1)1299 AGVManager.sendRosCommunicationRequestData(1300 '{"cmd_type":9002,"data":{"value":1000},"seq_cmd":1,"version":0}')1301 time.sleep(1)1302 assert_fun(0)1303 elif input_text == "90024": # 水平高度设置-数值小于最小值1304 setattr(Res, 'start_listener', True)1305 time.sleep(1)1306 AGVManager.sendRosCommunicationRequestData(1307 '{"cmd_type":9002,"data":{"value":1000},"seq_cmd":1,"version":0}')1308 time.sleep(1)1309 assert_fun(1)1310 elif input_text == "90025": # 水平高度设置-数值大于最大值1311 setattr(Res, 'start_listener', True)1312 time.sleep(1)1313 AGVManager.sendRosCommunicationRequestData(1314 '{"cmd_type":9002,"data":{"value":1000},"seq_cmd":1,"version":0}')1315 time.sleep(1)1316 assert_fun(1)1317 elif input_text == "90026": # 水平高度设置-数值为字符串1318 setattr(Res, 'start_listener', True)1319 time.sleep(1)1320 AGVManager.sendRosCommunicationRequestData(1321 '{"cmd_type":9002,"data":{"value":"1000"},"seq_cmd":1,"version":0}')1322 time.sleep(1)1323 assert_fun(1)1324 elif input_text == "90027": # 水平高度设置-数值为浮点数1325 setattr(Res, 'start_listener', True)1326 time.sleep(1)1327 AGVManager.sendRosCommunicationRequestData(1328 '{"cmd_type":9002,"data":{"value":1000.0},"seq_cmd":1,"version":0}')1329 time.sleep(1)1330 assert_fun(1)1331 elif input_text == "90028": # 水平高度设置-数值为""1332 setattr(Res, 'start_listener', True)1333 time.sleep(1)1334 AGVManager.sendRosCommunicationRequestData(1335 '{"cmd_type":9002,"data":{"value":""},"seq_cmd":1,"version":0}')1336 time.sleep(1)1337 assert_fun(1)1338 elif input_text == "90029": # 水平高度设置-数值为None1339 setattr(Res, 'start_listener', True)1340 time.sleep(1)1341 AGVManager.sendRosCommunicationRequestData(1342 '{"cmd_type":9002,"data":{"value":None},"seq_cmd":1,"version":0}')1343 time.sleep(1)1344 assert_fun(1)1345 elif input_text == "900210": # 水平高度设置-数值缺失1346 setattr(Res, 'start_listener', True)1347 time.sleep(1)1348 AGVManager.sendRosCommunicationRequestData(1349 '{"cmd_type":9002,"data":{},"seq_cmd":1,"version":0}')1350 time.sleep(1)1351 assert_fun(1)1352 elif input_text == "quit":1353 AGVManager.disconnectRobot()1354 break1355 jpype.shutdownJVM() # 最后关闭jvm1356def assert_fun(result1):1357 data = getattr(Res, "data")1358 print("recv_data:" + str(data))1359 if data == result1:1360 result = "断言成功"1361 else:1362 result = "断言失败"1363 print("result:" + result)1364 setattr(Res, 'start_listener', False)1365 setattr(Res, 'data', None)...

Full Screen

Full Screen

resolver.py

Source:resolver.py Github

copy

Full Screen

...41 etcd_host, etcd_port)42 self._names = {}43 self._addr_cls = addr_cls or PlainAddress44 if start_listener:45 self.start_listener()46 def resolve(self, name):47 """Resolve gRPC service name.48 :param name: gRPC service name.49 :rtype list: A collection gRPC server address.50 """51 with self._lock:52 try:53 return self._names[name]54 except KeyError:55 addrs = self.get(name)56 self._names[name] = addrs57 return addrs58 def get(self, name):59 """Get values from Etcd.60 :param name: Etcd key prefix name.61 :rtype list: A collection of Etcd values.62 """63 keys = self._client.get_prefix(name)64 vals = []65 plain = True66 if self._addr_cls != PlainAddress:67 plain = False68 for val, metadata in keys:69 if plain:70 vals.append(self._addr_cls.from_value(val))71 else:72 add, addr = self._addr_cls.from_value(val)73 if add:74 vals.append(addr)75 return vals76 def update(self, **kwargs):77 """Add or delete service address.78 :param kwargs: Dictionary of ``'service_name': ((add-address, delete-address)).``79 """80 with self._lock:81 for name, (add, delete) in kwargs.items():82 try:83 self._names[name].extend(add)84 except KeyError:85 self._names[name] = add86 for del_item in delete:87 try:88 self._names[name].remove(del_item)89 except ValueError:90 continue91 def listen(self):92 """Listen for change about gRPC service address."""93 while not self._stopped:94 for name in self._names:95 try:96 vals = self.get(name)97 except:98 continue99 else:100 with self._lock:101 self._names[name] = vals102 time.sleep(self._listen_timeout)103 def start_listener(self, daemon=True):104 """Start listen thread.105 :param daemon: Indicate whether start thread as a daemon.106 """107 if self._listening:108 return109 thread_name = 'Thread-resolver-listener'110 self._listen_thread = threading.Thread(111 target=self.listen, name=thread_name)112 self._listen_thread.daemon = daemon113 self._listen_thread.start()114 self._listening = True115 def stop(self):116 """Stop service resolver."""117 if self._stopped:118 return119 self._stopped = True120 def __del__(self):121 self.stop()122class ZkServiceResolver(ServiceResolver):123 def __init__(self, zkServers, start_listener=True, connect_timeout=30, listen_timeout=5):124 self._client = KazooClient(hosts=zkServers, timeout=connect_timeout)125 self._listen_timeout = listen_timeout126 def resolve(self, name):127 pass128 def update(self, **kwargs):129 pass130 def listen(self):131 pass132class EurekaServiceResolver(ServiceResolver):133 def __init__(self, eureka_servers, resolver_name='eureka_nameresolver', start_listener=True, listen_timeout=30, connect_timeout=30):134 self._listening = False135 self._stopped = False136 self._listen_thread = None137 self._listen_timeout = listen_timeout138 self._lock = threading.Lock()139 instance_id = f'{resolver_name}:{self.get_ip()}'140 self._client = eureka_client.init(eureka_server=eureka_servers, app_name=resolver_name, instance_id=instance_id)141 self._names = {}142 if start_listener:143 self.start_listener()144 self._listen_timeout = listen_timeout145 def get_ip(self):146 hostname = socket.gethostname()147 # 获取本机ip148 ip = socket.gethostbyname(hostname)149 return ip150 def resolve(self, name):151 """Resolve gRPC service name.152 :param name: gRPC service name.153 :rtype list: A collection gRPC server address.154 """155 with self._lock:156 try:157 return self._names[name]158 except KeyError:159 addrs = self.get(name)160 self._names[name] = addrs161 return addrs162 def get(self, name):163 """Get values from eureka.164 :param name: eureka service name.165 :rtype list: A collection of eureka values.166 """167 upper_name = name.upper()168 app = self._client.applications.get_application(upper_name)169 addrs = list()170 for up_instance in app.up_instances:171 addr = f"{up_instance.ipAddr}:{up_instance.metadata['gRPC.port']}"172 addrs.append(addr)173 return addrs174 def update(self, **kwargs):175 """Add or delete service address.176 :param kwargs: Dictionary of ``'service_name': ((add-address, delete-address)).``177 """178 with self._lock:179 for name, (add, delete) in kwargs.items():180 try:181 self._names[name].extend(add)182 except KeyError:183 self._names[name] = add184 for del_item in delete:185 try:186 self._names[name].remove(del_item)187 except ValueError:188 continue189 def listen(self):190 """Listen for change about gRPC service address."""191 while not self._stopped:192 for name in self._names:193 try:194 vals = self.get(name)195 except:196 continue197 else:198 with self._lock:199 self._names[name] = vals200 time.sleep(self._listen_timeout)201 def start_listener(self, daemon=True):202 """Start listen thread.203 :param daemon: Indicate whether start thread as a daemon.204 """205 if self._listening:206 return207 thread_name = 'Thread-resolver-listener'208 self._listen_thread = threading.Thread(209 target=self.listen, name=thread_name)210 self._listen_thread.daemon = daemon211 self._listen_thread.start()212 self._listening = True213 def stop(self):214 """Stop service resolver."""215 if self._stopped:...

Full Screen

Full Screen

__main__.py

Source:__main__.py Github

copy

Full Screen

...41async def _on_started(event:hikari.StartedEvent) -> None:42 channel = await bot.rest.fetch_channel(bot_config["logging"]["startup"])43 await channel.send("Bot has Started")44 view = pingroles()45 view.start_listener()46 view2 = verify()47 view2.start_listener()48 view3 = botaccess()49 view3.start_listener()50 view4 = karutaaccess()51 view4.start_listener()52 view5 = colorroles()53 view5.start_listener()54 view6 = bioroles()55 view6.start_listener()56 # view3 = giveaway_view()57 # view3.start_listener()58@bot.listen(hikari.StoppingEvent)59async def _on_ended(event:hikari.StoppingEvent) -> None:60 channel = await bot.rest.fetch_channel(bot_config["logging"]["startup"])61 await channel.send("Bot has Stopped")62@bot.command()63@lightbulb.option("extension", "The extension to reload", modifier=lightbulb.commands.base.OptionModifier(3))64@lightbulb.add_checks(lightbulb.owner_only)65@lightbulb.command("reload", "reload a bots extension")66@lightbulb.implements(lightbulb.PrefixCommand)67async def _reload(ctx: lightbulb.Context) -> None:68 extension = ctx.options.extension69 try:70 ctx.bot.reload_extensions(f"bot.extensions.{extension}")71 embed = hikari.Embed(description=f"Reloaded extention {extension}", color=bot_config["color"]["default"])...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful