How to use _receive_msg method in autotest

Best Python code snippet using autotest_python

fh0a.py

Source:fh0a.py Github

copy

Full Screen

...21 :param wait_time: 等待时间,单位为秒22 """23 jsSleepWithCallbackEvery(24 wait_time * 1000, 50,25 lambda: self._receive_msg()26 )27 # startTime = time.time() * 100028 # endTime = time.time() * 100029 # last = 030 # while endTime - startTime < wait_time * 1000:31 # if (endTime - startTime) > (last * 50):32 # self._receive_msg()33 # last = last + 134 # endTime = time.time() * 100035 # pass36 def _split_state(self, acc: Dict, x: str) -> Dict:37 if ':' in x:38 p = x.split(':')39 acc[p[0]] = p[1]40 return acc41 def _receive_msg(self) -> None:42 """43 解析返回数据44 :return: None45 """46 msgs: List[str] = getBufMsgList().split('\n')47 for msg in msgs:48 m: List[str] = msg.split(' ')49 if len(m) >= 3:50 if m[1] == '0' and m[2] == 'status':51 states: str = m[3]52 st: Dict[str, Any] = reduce(53 self._split_state,54 states.split(';'),55 {}56 )57 if m[0] in self.uav_statement:58 self.uav_statement[m[0]].update(st)59 else:60 self.uav_statement[m[0]] = st61 # update `is_flying` state from h/lock_flag info62 if 'lock_flag' in self.uav_statement[m[0]]:63 self.uav_statement[m[0]]['is_flying'] = self.uav_statement[m[0]]['lock_flag']64 if 'loc_x' in self.uav_statement[m[0]]:65 self.uav_statement[m[0]]['x'] = self.uav_statement[m[0]]['loc_x']66 if 'loc_y' in self.uav_statement[m[0]]:67 self.uav_statement[m[0]]['y'] = self.uav_statement[m[0]]['loc_y']68 if 'high' in self.uav_statement[m[0]]:69 self.uav_statement[m[0]]['h'] = self.uav_statement[m[0]]['high']70 elif m[1] != '0':71 # cmd table72 cId = int(m[1]) - 173 if cId in self.cmd_table:74 tt = self.cmd_table[cId]75 self.cmd_table[cId] = (tt[0], msg)76 pass77 pass78 def _sendCmd(self, command: str, cmdId: int) -> bool:79 self.tag = self.tag + 180 # cmd table81 self.cmd_table[cmdId] = (command, None)82 self._receive_msg()83 return sendCmd(command)84 def _open(self, port: str) -> bool:85 """86 command函数用于连接无人机87 :param port:飞鸿0A无人机为端口号 COM3 ,TT无人机为ip地址88 :return:连接状态是否成功,是则返回True,否则返回False89 """90 # if port in self.uav_statement:91 # return True92 # command = port + ' ' + str(self.tag * 2 + 1) + ' command'93 command = f"{port} {self.tag * 2 + 1} open"94 back = self._sendCmd(command, self.tag * 2 + 1)95 # return back96 return True97 # 飞鸿的ip字符串为端口号,TT的ip字符串为ip地址98 def add_uav(self, port: str) -> None:99 """100 input_uav函数用于添加无人机101 :param port:飞鸿0A无人机的ip字符串为端口号,TT无人机的ip字符串为ip地址102 """103 if self._open(port):104 y = {'x': '', 'y': '', 'h': '', 'is_flying': False}105 self.uav_statement[port] = y106 self._receive_msg()107 def get_position(self, port: str) -> Tuple[str, str, str]:108 """109 get_position函数用于获取无人机当前位置110 :param port: 无人机端口号111 :return:h,x,y112 """113 self._receive_msg()114 if port in self.uav_statement:115 st: Dict[str, Any] = self.uav_statement[port]116 return st['x'], st['y'], st['h']117 h = x = y = ''118 return h, x, y119 def get_state(self, port: str) -> Union[Dict, None]:120 """121 get_position函数用于获取无人机当前位置122 :param port: 无人机端口号123 :return:h,x,y124 """125 self._receive_msg()126 if port in self.uav_statement:127 st: Dict[str, Any] = self.uav_statement[port]128 return st129 return None130 def is_tag_ok(self, port: str):131 """132 is_dot_ok 函数用于检查无人机当前是否已经检测到指定二维码133 :param port: 无人机端口号134 :return: "0" or "1"135 """136 return self.uav_statement[port].get('is_tag_ok', '0')137 def is_dot_ok(self, port: str):138 """139 is_dot_ok 函数用于检查无人机当前是否已经检测到指定颜色色块 或 已经检测到交点140 :param port: 无人机端口号141 :return: "0" or "1"142 """143 return self.uav_statement[port].get('is_dot_ok', '0')144 def show_uav_list(self) -> None:145 """146 show_uav_list函数用于查看所有无人机147 :return:string148 """149 self._receive_msg()150 for (port, state) in self.uav_statement.items():151 print("port: {0} state: {1}".format(port, state))152 def _send_commond_without_return(self, command: str, cmdId: int) -> bool:153 return self._sendCmd(str(command), cmdId)154 def _send_commond_with_return(self, command: str, cmdId: int, timeout: int = RESPONSE_TIMEOUT) -> Union[str, None]:155 self._sendCmd(str(command), cmdId)156 timestamp = time.time()157 while 1:158 jsSleepWithCallbackEvery(159 200, 20,160 lambda: self._receive_msg()161 )162 if time.time() - timestamp > timeout:163 return None164 if self.cmd_table[cmdId][1] != None:165 continue166 else:167 return self.cmd_table[cmdId][1]168 def land(self, port: str) -> Literal[True]:169 """170 land函数用于控制无人机降落171 :param port: 无人机端口号172 """173 self._receive_msg()174 # if not self.uav_statement[port]['is_flying']:175 # return True176 command = f"{port} {self.tag * 2 + 1} land"177 back = self._send_commond_without_return(command, self.tag * 2 + 1)178 # back = self._send_commond_with_return(command, self.tag * 2 + 1, timeout = 20)179 self.uav_statement[port]['is_flying'] = False180 return True181 def emergency(self, port: str) -> Literal[True]:182 """183 emergency 函数用于控制无人机紧急降落184 :param port: 无人机端口号185 """186 self._receive_msg()187 # if not self.uav_statement[port]['is_flying']:188 # return True189 command = f"{port} {self.tag * 2 + 1} emergency"190 back = self._send_commond_without_return(command, self.tag * 2 + 1)191 # back = self._send_commond_with_return(command, self.tag * 2 + 1, timeout = 20)192 self.uav_statement[port]['is_flying'] = False193 return True194 def takeoff(self, port: str, high: int) -> Literal[True]:195 """196 takeoff函数用于控制无人机起飞197 :param port: 无人机端口号198 :param high:起飞高度(厘米)199 """200 self._receive_msg()201 # if self.uav_statement[port]['is_flying']:202 # return True203 command = f"{port} {self.tag * 2 + 1} takeoff {high}"204 back = self._send_commond_without_return(command, self.tag * 2 + 1)205 # back = self._send_commond_with_return(command, self.tag * 2 + 1, timeout = 20)206 self.uav_statement[port]['is_flying'] = True207 return True208 def up(self, port: str, distance: int) -> bool:209 """210 up 向上移动211 :param port: 无人机端口号212 :param distance:移动距离(厘米)213 """214 self._receive_msg()215 if not self.uav_statement[port]['is_flying']:216 return False217 command = f"{port} {self.tag * 2 + 1} up {distance}"218 self._send_commond_without_return(command, self.tag * 2 + 1)219 return True220 def down(self, port: str, distance: int) -> bool:221 """222 down 向下移动223 :param port: 无人机端口号224 :param distance:移动距离(厘米)225 """226 self._receive_msg()227 if not self.uav_statement[port]['is_flying']:228 return False229 command = f"{port} {self.tag * 2 + 1} down {distance}"230 self._send_commond_without_return(command, self.tag * 2 + 1)231 return True232 def forward(self, port: str, distance: int) -> bool:233 """234 forward 向前移动235 :param port: 无人机端口号236 :param distance:移动距离(厘米)237 """238 self._receive_msg()239 if not self.uav_statement[port]['is_flying']:240 return False241 command = f"{port} {self.tag * 2 + 1} forward {distance}"242 self._send_commond_without_return(command, self.tag * 2 + 1)243 return True244 def back(self, port: str, distance: int) -> bool:245 """246 back 向后移动247 :param port: 无人机端口号248 :param distance:移动距离(厘米)249 """250 self._receive_msg()251 if not self.uav_statement[port]['is_flying']:252 return False253 command = f"{port} {self.tag * 2 + 1} back {distance}"254 self._send_commond_without_return(command, self.tag * 2 + 1)255 return True256 def left(self, port: str, distance: int) -> bool:257 """258 left 向左移动259 :param port: 无人机端口号260 :param distance:移动距离(厘米)261 """262 self._receive_msg()263 if not self.uav_statement[port]['is_flying']:264 return False265 command = f"{port} {self.tag * 2 + 1} left {distance}"266 self._send_commond_without_return(command, self.tag * 2 + 1)267 return True268 def right(self, port: str, distance: int) -> bool:269 """270 right 向右移动271 :param port: 无人机端口号272 :param distance:移动距离(厘米)273 """274 self._receive_msg()275 if not self.uav_statement[port]['is_flying']:276 return False277 command = f"{port} {self.tag * 2 + 1} right {distance}"278 self._send_commond_without_return(command, self.tag * 2 + 1)279 return True280 def _move(self, port: str, direct: int, distance: int) -> bool:281 """282 move 函数用于控制无人机移动283 :param port: 无人机端口号284 :param direct:移动方向(1上2下3前4后5左6右)285 :param distance:移动距离(厘米)286 """287 self._receive_msg()288 if not self.uav_statement[port]['is_flying']:289 return False290 command = f"{port} {self.tag * 2 + 1} move {direct} {distance}"291 self._send_commond_without_return(command, self.tag * 2 + 1)292 return True293 def goto(self, port: str, x: int, y: int, h: int) -> bool:294 """295 goto 函数用于控制无人机到达指定位置296 :param port: 无人机端口号297 :param x: x轴方向位置(厘米)298 :param y: y轴方向位置(厘米)299 :param h: 高度(厘米)300 """301 self._receive_msg()302 if not self.uav_statement[port]['is_flying']:303 return False304 command = f"{port} {self.tag * 2 + 1} goto {x} {y} {h}"305 self._send_commond_without_return(command, self.tag * 2 + 1)306 return True307 def flip(self, port: str, direction: str) -> bool:308 """309 flip函数用于控制无人机翻滚310 :param port: 无人机端口号311 :param direction: 翻滚方向(f前 b后 l左 r右)312 """313 self._receive_msg()314 if not self.uav_statement[port]['is_flying']:315 return False316 command = f"{port} {self.tag * 2 + 1} flip {direction} 1"317 self._send_commond_without_return(command, self.tag * 2 + 1)318 return True319 def rotate(self, port: str, degree: int) -> bool:320 """321 rotate函数用于控制无人机自转322 :param port: 无人机端口号323 :param degree: 自转方向和大小(正数顺时针,负数逆时针,单位为度数)324 """325 self._receive_msg()326 if not self.uav_statement[port]['is_flying']:327 return False328 command = f"{port} {self.tag * 2 + 1} rotate {degree}"329 self._send_commond_without_return(command, self.tag * 2 + 1)330 return True331 def cw(self, port: str, degree: int) -> bool:332 """333 cw 控制无人机顺时针自转334 :param port: 无人机端口号335 :param degree: 自转角度度数336 """337 self._receive_msg()338 if not self.uav_statement[port]['is_flying']:339 return False340 command = f"{port} {self.tag * 2 + 1} cw {degree}"341 self._send_commond_without_return(command, self.tag * 2 + 1)342 return True343 def ccw(self, port: str, degree: int) -> bool:344 """345 ccw 函数用于控制无人机逆时针自转346 :param port: 无人机端口号347 :param degree: 自转角度度数348 """349 self._receive_msg()350 if not self.uav_statement[port]['is_flying']:351 return False352 command = f"{port} {self.tag * 2 + 1} ccw {degree}"353 self._send_commond_without_return(command, self.tag * 2 + 1)354 return True355 def speed(self, port: str, speed: int) -> bool:356 """357 speed函数用于控制无人机飞行速度358 :param port: 无人机端口号359 :param speed: 飞行速度(0-200厘米/秒)360 """361 self._receive_msg()362 # if not self.uav_statement[port]['is_flying']:363 # return False364 command = f"{port} {self.tag * 2 + 1} setSpeed {speed}"365 self._send_commond_without_return(command, self.tag * 2 + 1)366 return True367 def high(self, port: str, high: int) -> bool:368 """369 high用于控制无人机飞行高度370 :param port: 无人机端口号371 :param high: 飞行高度(厘米)372 """373 self._receive_msg()374 if not self.uav_statement[port]['is_flying']:375 return False376 command = f"{port} {self.tag * 2 + 1} setHeight {high}"377 self._send_commond_without_return(command, self.tag * 2 + 1)378 return True379 def led(self, port: str, r: int, g: int, b: int) -> bool:380 """381 led函数控制无人机灯光382 :param port: 无人机端口号383 :param r: 灯光颜色R通道384 :param g: 灯光颜色G通道385 :param b: 灯光颜色B通道386 """387 self._receive_msg()388 # if not self.uav_statement[port]['is_flying']:389 # return False390 command = f"{port} {self.tag * 2 + 1} light {r} {g} {b}"391 self._send_commond_without_return(command, self.tag * 2 + 1)392 return True393 def bln(self, port: str, r: int, g: int, b: int) -> bool:394 """395 led函数控制无人机灯光,呼吸灯396 :param port: 无人机端口号397 :param r: 灯光颜色R通道398 :param g: 灯光颜色G通道399 :param b: 灯光颜色B通道400 """401 self._receive_msg()402 # if not self.uav_statement[port]['is_flying']:403 # return False404 command = f"{port} {self.tag * 2 + 1} bln {r} {g} {b}"405 self._send_commond_without_return(command, self.tag * 2 + 1)406 return True407 def rainbow(self, port: str, r: int, g: int, b: int) -> bool:408 """409 led函数控制无人机灯光,七彩变换410 :param port: 无人机端口号411 :param r: 灯光颜色R通道412 :param g: 灯光颜色G通道413 :param b: 灯光颜色B通道414 """415 self._receive_msg()416 # if not self.uav_statement[port]['is_flying']:417 # return False418 command = f"{port} {self.tag * 2 + 1} rainbow {r} {g} {b}"419 self._send_commond_without_return(command, self.tag * 2 + 1)420 return True421 def mode(self, port: str, mode: int) -> bool:422 """423 mode函数用于切换飞行模式424 :param port: 无人机端口号425 :param mode:飞行模式(1常规2巡线3跟随4单机编队)426 """427 self._receive_msg()428 # if not self.uav_statement[port]['is_flying']:429 # return False430 command = f"{port} {self.tag * 2 + 1} airplane_mode {mode}"431 self._send_commond_without_return(command, self.tag * 2 + 1)432 return True433 # def _visionMode(self, port: str, mode: int) -> bool:434 # """435 # visionMode函数用于设置视觉工作模式436 # :param port: 无人机端口号437 # :param mode:视觉工作模式(1点检测2线检测3标签检测4二维码扫描5条形码扫描)438 # """439 # self._receive_msg()440 # if not self.uav_statement[port]['is_flying']:441 # return False442 # command = f"{port} {self.tag * 2 + 1} visionMode {mode}"443 # self._send_commond_without_return(command, self.tag * 2 + 1)444 # return True445 def color_detect(self, port: str, L_L: int, L_H: int, A_L: int, A_H: int, B_L: int, B_H: int) -> bool:446 """447 visionColor函数用于设置视觉工作模式为色块检测448 :param port: 无人机端口号449 :param L_L: 色块L通道的最低检测值450 :param L_H: 色块L通道的最高检测植451 :param A_L: 色块A通道的最低检测植452 :param A_H: 色块A通道的最高检测值453 :param B_L: 色块B通道的最低检测植454 :param B_H: 色块B通道的最高检测植z455 """456 self._receive_msg()457 # if not self.uav_statement[port]['is_flying']:458 # return False459 command = f"{port} {self.tag * 2 + 1} colorDetect {L_L} {L_H} {A_L} {A_H} {B_L} {B_H}"460 self._send_commond_without_return(command, self.tag * 2 + 1)461 return True462 def color_detect_label(self, port: str, label: str) -> bool:463 """464 visionColor函数用于设置视觉工作模式为色块检测465 :param port: 无人机端口号466 :param label: 预定义色彩标签名467 """468 self._receive_msg()469 # if not self.uav_statement[port]['is_flying']:470 # return False471 command = f"{port} {self.tag * 2 + 1} colorDetectLabel {label}"472 self._send_commond_without_return(command, self.tag * 2 + 1)473 return True474 # def patrol_line_direction(self, port: str, direction: int) -> bool:475 # """476 # patrol_line_direction函数用于切换无人机巡线方向477 # :param port: 无人机端口号478 # :param direction:巡线方向(1前2后3左4右)479 # """480 # self._receive_msg()481 # if not self.uav_statement[port]['is_flying']:482 # return False483 # command = f"{port} {self.tag * 2 + 1} patrol_line_direction {direction}"484 # self._send_commond_without_return(command, self.tag * 2 + 1)485 # return True486 # def identify_label(self, port: str, id: int) -> bool:487 # """488 # identify_label函数用于指定识别某个标签号489 # :param port: 无人机端口号490 # :param id:目标标签号,设置后只识别该号标签491 # """492 # self._receive_msg()493 # if not self.uav_statement[port]['is_flying']:494 # return False495 # command = f"{port} {self.tag * 2 + 1} identify_label {id}"496 # self._send_commond_without_return(command, self.tag * 2 + 1)497 # return True498 #499 # def toward_move_label(self, port: str, direction: int, distance: int, id: int) -> bool:500 # """501 # toward_move_label函数指定无人机移动某距离寻找某号标签502 # :param port: 无人机端口号503 # :param direction:移动方向(1上2下3前4后5左6右)504 # :param distance:移动距离(厘米)505 # :param id:目标标签号,移动过程中看到该标签会自动悬停在上方506 # """507 # self._receive_msg()508 # if not self.uav_statement[port]['is_flying']:509 # return False510 # command = f"{port} {self.tag * 2 + 1} toward_move_label {direction} {distance} {id}"511 # self._send_commond_without_return(command, self.tag * 2 + 1)512 # return True513 #514 # def move_up_label(self, port: str, distance: int, id: int) -> bool:515 # """516 # move_up_label 函数指定无人机向上移动某距离寻找某号标签517 # :param port: 无人机端口号518 # :param distance:移动距离(厘米)519 # :param id:目标标签号,移动过程中看到该标签会自动悬停在上方520 # """521 # self._receive_msg()522 # if not self.uav_statement[port]['is_flying']:523 # return False524 # command = f"{port} {self.tag * 2 + 1} move_up_label {distance} {id}"525 # self._send_commond_without_return(command, self.tag * 2 + 1)526 # return True527 #528 # def move_down_label(self, port: str, distance: int, id: int) -> bool:529 # """530 # move_down_label 函数指定无人机向下移动某距离寻找某号标签531 # :param port: 无人机端口号532 # :param distance:移动距离(厘米)533 # :param id:目标标签号,移动过程中看到该标签会自动悬停在上方534 # """535 # self._receive_msg()536 # if not self.uav_statement[port]['is_flying']:537 # return False538 # command = f"{port} {self.tag * 2 + 1} move_down_label {distance} {id}"539 # self._send_commond_without_return(command, self.tag * 2 + 1)540 # return True541 #542 # def move_forward_label(self, port: str, distance: int, id: int) -> bool:543 # """544 # move_forward_label 函数指定无人机向前移动某距离寻找某号标签545 # :param port: 无人机端口号546 # :param distance:移动距离(厘米)547 # :param id:目标标签号,移动过程中看到该标签会自动悬停在上方548 # """549 # self._receive_msg()550 # if not self.uav_statement[port]['is_flying']:551 # return False552 # command = f"{port} {self.tag * 2 + 1} move_forward_label {distance} {id}"553 # self._send_commond_without_return(command, self.tag * 2 + 1)554 # return True555 #556 # def move_back_label(self, port: str, distance: int, id: int) -> bool:557 # """558 # move_back_label 函数指定无人机向后移动某距离寻找某号标签559 # :param port: 无人机端口号560 # :param distance:移动距离(厘米)561 # :param id:目标标签号,移动过程中看到该标签会自动悬停在上方562 # """563 # self._receive_msg()564 # if not self.uav_statement[port]['is_flying']:565 # return False566 # command = f"{port} {self.tag * 2 + 1} move_back_label {distance} {id}"567 # self._send_commond_without_return(command, self.tag * 2 + 1)568 # return True569 #570 # def move_left_label(self, port: str, distance: int, id: int) -> bool:571 # """572 # move_left_label 函数指定无人机向左移动某距离寻找某号标签573 # :param port: 无人机端口号574 # :param distance:移动距离(厘米)575 # :param id:目标标签号,移动过程中看到该标签会自动悬停在上方576 # """577 # self._receive_msg()578 # if not self.uav_statement[port]['is_flying']:579 # return False580 # command = f"{port} {self.tag * 2 + 1} move_left_label {distance} {id}"581 # self._send_commond_without_return(command, self.tag * 2 + 1)582 # return True583 #584 # def move_right_label(self, port: str, distance: int, id: int) -> bool:585 # """586 # move_right_label 函数指定无人机向右移动某距离寻找某号标签587 # :param port: 无人机端口号588 # :param distance:移动距离(厘米)589 # :param id:目标标签号,移动过程中看到该标签会自动悬停在上方590 # """591 # self._receive_msg()592 # if not self.uav_statement[port]['is_flying']:593 # return False594 # command = f"{port} {self.tag * 2 + 1} move_right_label {distance} {id}"595 # self._send_commond_without_return(command, self.tag * 2 + 1)596 # return True597 # def obstacle_range(self, port: str, distance: int) -> bool:598 # """599 # obstacle_range函数用于设置障碍物检测范围600 # :param port: 无人机端口号601 # :param distance:检测范围(厘米)602 # """603 # self._receive_msg()604 # if not self.uav_statement[port]['is_flying']:605 # return False606 # command = f"{port} {self.tag * 2 + 1} obstacle_range {distance}"607 # self._send_commond_without_return(command, self.tag * 2 + 1)608 # return True609 # def solenoid(self, port: str, switch: int) -> bool:610 # """611 # solenoid函数用于无人机电磁铁控制612 # :param port: 无人机端口号613 # :param switch:电磁铁控制(0关闭1打开)614 # """615 # self._receive_msg()616 # if not self.uav_statement[port]['is_flying']:617 # return False618 # command = f"{port} {self.tag * 2 + 1} solenoid {switch}"619 # self._send_commond_without_return(command, self.tag * 2 + 1)620 # return True621 # def steering(self, port: str, angle: int) -> bool:622 # """623 # steering函数用于无人机舵机控制624 # :param port: 无人机端口号625 # :param angle:舵机角度(+/-90度)626 # """627 # self._receive_msg()628 # if not self.uav_statement[port]['is_flying']:629 # return False630 # command = f"{port} {self.tag * 2 + 1} steering {angle}"631 # self._send_commond_without_return(command, self.tag * 2 + 1)632 # return True633 def stop(self, port: str) -> bool:634 return self.hover(port)635 def hover(self, port: str) -> bool:636 """637 hover 函数用于控制无人机悬停638 :param port: 无人机端口号639 """640 self._receive_msg()641 if not self.uav_statement[port]['is_flying']:642 return False643 command = f"{port} {self.tag * 2 + 1} hover"644 self._send_commond_without_return(command, self.tag * 2 + 1)645 return True646 # def end(self) -> None:647 # """648 # end函数用于降落所有编队无人机649 # """650 # for (port, uav) in self.uav_statement.items():651 # if uav['is_flying']:652 # self.land(port)653 def set_single_setting(self, port: str, mode: int, channel: int, address: int):654 """655 设置单机设置656 :param port: 无人机端口号657 :param mode: 0低速1中速2高速658 :param channel: 通信信道0~125659 :param address: 通信地址660 :return:661 """662 self._receive_msg()663 command = f"{port} {self.tag * 2 + 1} single_setting {mode} {channel} {address}"664 self._send_commond_without_return(command, self.tag * 2 + 1)665 pass666 def set_multiply_setting(self, port: str, mode: int, airplaneNumber: int, channel: int, address: int):667 """668 设置多机模式设置669 :param port: 无人机端口号670 :param mode: 0单机模式1编队模式671 :param airplaneNumber: 飞机编号672 :param channel: 通信信道0~125673 :param address: 通信地址674 :return:675 """676 self._receive_msg()677 command = f"{port} {self.tag * 2 + 1} multiply_setting {mode} {airplaneNumber} {channel} {address}"678 self._send_commond_without_return(command, self.tag * 2 + 1)679 return True680 def read_multi_setting(self, port: str):681 """682 读取多机模式设置683 :param port: 无人机端口号684 :return:685 """686 self._receive_msg()687 command = f"{port} {self.tag * 2 + 1} read_multi_setting"688 self._send_commond_without_return(command, self.tag * 2 + 1)689 return True690 def read_single_setting(self, port: str):691 """692 读取单机模式设置693 :param port: 无人机端口号694 :return:695 """696 self._receive_msg()697 command = f"{port} {self.tag * 2 + 1} read_single_setting"698 self._send_commond_without_return(command, self.tag * 2 + 1)699 return True700 def read_hardware_setting(self, port: str):701 """702 读取硬件信息703 :param port: 无人机端口号704 :return:705 """706 self._receive_msg()707 command = f"{port} {self.tag * 2 + 1} read_hardware_setting"708 self._send_commond_without_return(command, self.tag * 2 + 1)709 return True710 def cleanup(self):711 """712 cleanup函数用于清理内部状态表713 :return:714 """715 self.uav_statement = {}716 self.cmd_table = {}717 # def __del__(self):...

Full Screen

Full Screen

sock_chat.py

Source:sock_chat.py Github

copy

Full Screen

...27 def _send_msg(self, message, remote_socket):28 remote_socket.send(bytes(message,"utf-8"))29 logging.info(f"{self} sent message: {message}")30 31 def _receive_msg(self, remote_socket):32 message = remote_socket.recv(RECV_SIZE).decode("utf-8")33 logging.info(f"{self} received message: {message}")34 return message35class Client(Socket):36 def __init__(self, id) -> None:37 self.id = id38 self.socket = self._client_socket()39 greeting = self._receive_msg(self.socket)40 print(greeting)41 def Send_msg(self,receiver_id, message_body):42 message_header = f"<{self.id}#{receiver_id}>"43 message = message_header + message_body44 request = f'Send_msg("{message}")'45 46 self._send_msg(request, self.socket)47 48 def Check_msg(self): #-> str:49 messages = self._server_fetch_msg()50 logging.info(f"The messages are: {messages}")51 return messages52 def _server_fetch_msg(self):53 request = f'Check_msg({self.id})'54 self._send_msg(request, self.socket)55 messages = []56 msg = " "57 # while msg != "":58 msg = self._receive_msg(self.socket)59 messages.append(msg)60 return messages61class Server(Socket):62 def __init__(self) -> None:63 self.pending_msg = {}64 # self.clients = []65 # self.conection_ids = []66 # conversation = ()67 self.socket = self._server_socket()68 listening = threading.Thread(target=self._accept_conections)69 listening.start()70 def _accept_conections(self):71 while True:72 client_socket, address = self.socket.accept()73 print(f"Connection from {address} has been established!")74 self._send_msg("Welcome to the server!", client_socket)75 # msg = self._receive_msg(clientsocket)76 # self.clients.append(client_socket)77 handler_thread = threading.Thread(target=self._handle_connection, args=(client_socket,))78 handler_thread.start()79 def _handle_connection(self, client_sock):80 while True:81 try:82 message = self._receive_msg(client_sock)83 self._handle_msg(message, client_sock)84 except:85 logging.error("some error!")86 client_sock.close()87 break88 89 def _handle_msg(self, message, client_sock):90 # handle = {91 # "Check_msg" : self._fetch_msg,92 # "Send_msg" : self._receive_chat93 # }94 logging.info(f"message to handle: {message}")95 request, body = message[:-1].split("(")96 # handle[request](body)...

Full Screen

Full Screen

client.py

Source:client.py Github

copy

Full Screen

...30 super().__init__()31 self._socket_host = socket_host32 self._socket_port = socket_port33 self.server_url = urllib.parse.urlsplit(f"http://{host}:{port}/")34 async def _receive_msg(self):35 reader, writer = await asyncio.open_connection(self._socket_host, self._socket_port)36 data = await reader.read(100)37 msg = NMEASentence(data.decode())38 return msg 39 async def send_message(self):40 """Send messages from queue"""41 url = self.server_url42 msg = await self._receive_msg()43# if url.scheme == 'https':44# reader, writer = await asyncio.open_connection(45# url.hostname, 443, ssl=True)46# else:47 reader, writer = await asyncio.open_connection(48 url.hostname, url.port)49 data = json.dumps(msg.as_dict())50 query = (51 f"POST {url.path or '/'} HTTP/1.0\r\n"52 f"Host: {url.hostname}\r\n"53 f"User-Agent: python/requests2.0.1\r\n"54 f"Content-Length: {len(data)}\r\n"55 f"\r\n{data}\r\n"56 )...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful