How to use touchMove method in fMBT

Best Python code snippet using fMBT_python

server.py

Source:server.py Github

copy

Full Screen

1from flask import Flask, render_template, Response, url_for, request, send_file, abort, send_from_directory, jsonify, \2 json3import yaml4import time5from datetime import datetime6import os7import matplotlib8from timeit import default_timer as timer9import jinja210import sys11import signal12from utils import alImage_to_PIL13from utils import PIL_to_JPEG_BYTEARRAY14from utils import is_video15from utils import is_image16from utils import is_external_path17from utils import is_txt_file18import socket19import argparse20from simple_sound_stream import SpeechRecognitionModule21import qi22import vision_definitions23from urlparse import unquote24app = Flask(__name__)25QI_SESSION = None26# helper for knowing what is on the tablet27TABLET_STATE = {28 "index": None,29 "video_or_website": False30}31global camera_tab_closed32camera_tab_closed = True33global camera_tab_timestamp34camera_tab_timestamp = 035global touchdown_hist36touchdown_hist = []37global touchmove_hist38touchmove_hist = []39global touchmove_ind40touchmove_ind = 141global latest_touchmove_used42latest_touchmove_used = 043global SAVE_IMGS44SAVE_IMGS = False45global RECORD_AUDIO46RECORD_AUDIO = False47global motion_vector48motion_vector = [0, 0, 0]49# Tablet needs to know where server is running...50try:51 # weird linux hack to get ip if /etc/hosts maps hostname to 127.0.0.152 # in that case, hostip would be 127.0.0.1 and pepper fails to reach server under that ip...53 # https://stackoverflow.com/questions/55296584/getting-127-0-1-1-instead-of-192-168-1-ip-ubuntu-python54 host_name = socket.gethostname()55 global HOST_IP56 HOST_IP = socket.gethostbyname(host_name + ".local")57except socket.gaierror:58 # this is the default case and works on max and windows...59 global HOST_IP60 HOST_IP = socket.gethostbyname(socket.gethostname())61FLASK_PORT = 500062FLASK_HOME = "http://" + HOST_IP + ":" + str(FLASK_PORT) + "/"63@app.route('/')64def index():65 read_config()66 if QI_SESSION is not None and QI_SESSION.isConnected(): # if session already exists, fronted was just reloaded...67 global ip68 return render_template("index.html", config=config, reconnect_ip=ip)69 else:70 return render_template('index.html', config=config, reconnect_ip="")71@app.route("/connect_robot")72def connect_robot():73 """74 Connects to robot with given IP.75 """76 global ip77 ip = request.args.get('ip', type=str)78 global port79 port = 955980 read_config() # update the config in case it has been edited in the meantime, nice for developing ^81 global QI_SESSION82 if QI_SESSION is not None and QI_SESSION.isConnected():83 # connect btn has been pressed while robot was already connect --> it is the disconnedt btn...84 del QI_SESSION85 QI_SESSION = qi.Session() # we make a new sess but don't connect it to anything --> essentially disconnect86 print("disconnecting interface by terminating session.")87 try:88 global SpeechRecognition89 SpeechRecognition.stop()90 del SpeechRecognition91 except (RuntimeError, NameError):92 # when camera tab is not open93 pass94 return {95 "status": "disconnected"96 }97 else:98 print "connecting interface to new robot session"99 # normal connect, we make a new session and connect to it100 try:101 # TODO doesn't solve the problem that session might still be trying to connect to invalid IP...102 print "attempting close and del session"103 QI_SESSION.close()104 del QI_SESSION105 time.sleep(1)106 except AttributeError:107 print "close attribute excaption pass..."108 # if the prev session is still trying to connect...109 pass110 QI_SESSION = None111 QI_SESSION = qi.Session()112 try:113 QI_SESSION.connect(str("tcp://" + str(ip) + ":" + str(port)))114 except RuntimeError as msg:115 print("qi session connect error!:")116 print(msg)117 QI_SESSION = None118 raise Exception("Couldn't connect session")119 print "past exception!"120 get_all_services()121 # almemory event subscribers122 # global tts_sub123 # tts_sub = mem_srv.subscriber("ALTextToSpeech/TextStarted")124 # tts_sub.signal.connect(tts_callback)125 tablet_srv.onTouchDownRatio.connect(touchDown_callback) # on touch down, aka one "click"126 tablet_srv.onTouchMove.connect(touchMove_callback) # finger slides on tablet127 tablet_srv.onTouchUp.connect(touchUp_callback)128 global vid_finished_signal129 vid_finished_signal = tablet_srv.videoFinished130 vid_finished_signal.connect(onVidEnd)131 tts_srv.setVolume(config["volume"])132 tts_srv.setParameter("pitchShift", config["voice_pitch"])133 tts_srv.setParameter("speed", config["voice_speed"])134 # tts_srv.say("Connected")135 # iterate over autonomous life configuration and set values...136 for key in config["autonomous_life_config"].keys():137 if config["autonomous_life_config"][key] == "":138 continue139 else:140 if key == "autonomous_state":141 al_srv.setState(config["autonomous_life_config"][key])142 elif key == "tangential_collision":143 motion_srv.setTangentialSecurityDistance(config["autonomous_life_config"][key])144 elif key == "orthogonal_collision":145 motion_srv.setOrthogonalSecurityDistance(config["autonomous_life_config"][key])146 elif key == "blinking":147 ab_srv.setEnabled(config["autonomous_life_config"][key])148 elif key == "engagement_mode":149 ba_srv.setEngagementMode(config["autonomous_life_config"][key])150 elif key == "head_breathing":151 motion_srv.setBreathEnabled("Head", config["autonomous_life_config"][key])152 elif key == "arms_breathing":153 motion_srv.setBreathEnabled("Arms", config["autonomous_life_config"][key])154 elif key == "body_breathing":155 motion_srv.setBreathEnabled("Body", config["autonomous_life_config"][key])156 elif key == "legs_breathing":157 motion_srv.setBreathEnabled("Legs", config["autonomous_life_config"][key])158 elif key == "basic_awareness":159 ba_srv.setEnabled(config["autonomous_life_config"][key])160 elif key == "listening_movement":161 lm_srv.setEnabled(config["autonomous_life_config"][key])162 elif key == "speaking_movement":163 sm_srv.setEnabled(config["autonomous_life_config"][key])164 # show default image if given165 show_default_img_or_hide()166 for color in config["colors"]:167 try:168 if color["is_default"]:169 r = color["red"]170 g = color["green"]171 b = color["blue"]172 led_srv.fadeRGB("FaceLeds", r, g, b, 0.5)173 except KeyError: # only one of the elements should have the flag...174 pass175 return {176 "status": "ok",177 "ip": ip,178 }179def tts_callback(value):180 print("in tts callback")181 print(value)182def onVidEnd():183 TABLET_STATE["video_or_website"] = False184 pass185def touchDown_callback(x, y, msg):186 print(x, y, msg)187 # we append newest first, so that we can nicely iterate over list and fade color out...188 global touchdown_hist189 touchdown_hist.insert(0, (x, y))190 if len(touchdown_hist) > 5:191 touchdown_hist = touchdown_hist[:5]192def touchMove_callback(x_offset, y_offset):193 print("slide: ", touchmove_ind, x_offset, y_offset)194 # if this is a new "series" of touchmoves, create empty list of at current index195 global touchmove_hist196 global latest_touchmove_used197 if latest_touchmove_used != touchmove_ind:198 touchmove_hist.append([])199 latest_touchmove_used = touchmove_ind200 touchmove_hist[-1].append((x_offset / 1600, y_offset / 1080))201def touchUp_callback(x, y):202 print("Touchup!")203 global touchmove_ind204 touchmove_ind += 1205 global touchmove_hist206 touchmove_hist.append([]) # whenever we have a touchdown event, this might be followed by a finger slide...207def get_all_services():208 """209 Provides global references to all naoqi services used somewhere down the line210 """211 global tts_srv212 tts_srv = QI_SESSION.service("ALTextToSpeech")213 global al_srv214 al_srv = QI_SESSION.service("ALAutonomousLife")215 global ba_srv216 ba_srv = QI_SESSION.service("ALBasicAwareness")217 global ab_srv218 ab_srv = QI_SESSION.service("ALAutonomousBlinking")219 global motion_srv220 motion_srv = QI_SESSION.service("ALMotion")221 global video_srv222 video_srv = QI_SESSION.service("ALVideoDevice")223 global tablet_srv224 tablet_srv = QI_SESSION.service("ALTabletService")225 global as_srv226 as_srv = QI_SESSION.service("ALAnimatedSpeech")227 global ap_srv228 ap_srv = QI_SESSION.service("ALAnimationPlayer")229 global posture_srv230 posture_srv = QI_SESSION.service("ALRobotPosture")231 global ar_srv232 ar_srv = QI_SESSION.service("ALAudioRecorder")233 global ad_srv234 ad_srv = QI_SESSION.service("ALAudioDevice")235 global fd_srv236 fd_srv = QI_SESSION.service("ALFaceDetection")237 global mem_srv238 mem_srv = QI_SESSION.service("ALMemory")239 global lm_srv240 lm_srv = QI_SESSION.service("ALListeningMovement")241 global sm_srv242 sm_srv = QI_SESSION.service("ALSpeakingMovement")243 global audio_player244 audio_player = QI_SESSION.service("ALAudioPlayer")245 global led_srv246 led_srv = QI_SESSION.service("ALLeds")247@app.route("/querry_states")248def querry_states():249 """250 Querries all states that are easily accessable. EG: What autunomous state are we in or251 which seting is toggeled?252 @return: A dict with ids from the frontend, with the value being what that element should represent253 """254 try:255 # see if audio transmission is running even though camera tab is closed...256 try:257 now = timer()258 # this should be obsolete now, camera calls close method when closed... but having this here doesn't hurt,259 # so leaving it, just in case260 if now - camera_tab_timestamp > 3: # if now keep alive ping within 5 seconds...261 if SpeechRecognition.isStarted:262 print("Handling close camera tab!")263 SpeechRecognition.stop() # stop the audio transmission264 # remove camera stream subscriber from video service265 if video_srv.getSubscribers():266 for subscriber in video_srv.getSubscribers():267 if "CameraStream" in subscriber: # name passed as argument on subscription268 video_srv.unsubscribe(subscriber)269 except NameError:270 pass # if SpeechRecognition module has never been started and doesn't exist...271 return {272 "#autonomous_states": al_srv.getState(),273 "#tangential_collision": round(motion_srv.getTangentialSecurityDistance(), 3) * 100, # convert form m to274 "#orthogonal_collision": round(motion_srv.getOrthogonalSecurityDistance(), 3) * 100, # cm for frontend275 "#toggle_btn_blinking": ab_srv.isEnabled(),276 "#toggle_btn_basic_awareness": ba_srv.isEnabled(),277 "#engagement_states": ba_srv.getEngagementMode(),278 "#toggle_btn_head_breathing": motion_srv.getBreathEnabled("Head"),279 "#toggle_btn_body_breathing": motion_srv.getBreathEnabled("Body"),280 "#toggle_btn_arms_breathing": motion_srv.getBreathEnabled("Arms"),281 "#toggle_btn_legs_breathing": motion_srv.getBreathEnabled("Legs"),282 "#volume_slider": tts_srv.getVolume(),283 "#voice_speed_input": tts_srv.getParameter("speed"),284 "#voice_pitch_input": tts_srv.getParameter("pitchShift"),285 "#motion_vector": [round(vel, 1) for vel in motion_srv.getRobotVelocity()],286 "#toggle_btn_listening": lm_srv.isEnabled(),287 "#toggle_btn_speaking": sm_srv.isEnabled(),288 "tablet_state": TABLET_STATE,289 "#querried_color": get_eye_colors(),290 "timestamp": timer()291 }292 except (NameError, RuntimeError):293 return {"STATE_QUERRY_ERR": "SESSION NOT AVAILABLE"}294@app.route("/set_autonomous_state")295def set_autonomous_state():296 """297 Sets the autunomous state298 """299 state = request.args.get('state', type=str)300 print(state)301 al_srv.setState(state)302 return {303 "status": "ok",304 "state": state305 }306@app.route("/set_engagement_mode")307def set_engagement_mode():308 """309 Sets the engagement mode310 """311 mode = request.args.get('mode', type=str)312 print(mode)313 ba_srv.setEngagementMode(mode)314 return {315 "status": "ok",316 "mode": mode317 }318@app.route("/say_text")319def say_text():320 msg = request.args.get('msg', type=str)321 tts_srv.say(msg)322 return {323 "status": "ok",324 "msg": msg325 }326@app.route("/toggle_setting")327def toggle_setting():328 setting = request.args.get('setting', type=str)329 print(setting)330 new_state = None331 if setting == "blinking":332 ab_srv.setEnabled(not ab_srv.isEnabled())333 new_state = ab_srv.isEnabled()334 elif setting == "head_breathing":335 motion_srv.setBreathEnabled("Head", not motion_srv.getBreathEnabled("Head"))336 new_state = motion_srv.getBreathEnabled("Head")337 elif setting == "arms_breathing":338 motion_srv.setBreathEnabled("Arms", not motion_srv.getBreathEnabled("Arms"))339 new_state = motion_srv.getBreathEnabled("Arms")340 elif setting == "body_breathing":341 motion_srv.setBreathEnabled("Body", not motion_srv.getBreathEnabled("Body"))342 new_state = motion_srv.getBreathEnabled("Body")343 elif setting == "legs_breathing":344 motion_srv.setBreathEnabled("Legs", not motion_srv.getBreathEnabled("Legs"))345 new_state = motion_srv.getBreathEnabled("Legs")346 elif setting == "basic_awareness":347 ba_srv.setEnabled(not ba_srv.isEnabled())348 new_state = ba_srv.isEnabled()349 elif setting == "listening":350 lm_srv.setEnabled(not lm_srv.isEnabled())351 new_state = lm_srv.isEnabled()352 elif setting == "speaking":353 sm_srv.setEnabled(not sm_srv.isEnabled())354 new_state = sm_srv.isEnabled()355 time.sleep(1)356 return {357 "status": "ok",358 "setting": setting,359 "new_state": new_state360 }361def show_default_img_or_hide():362 """363 Depending on whether a default image is given in the config, either shows that or resets the tablet to the default364 animation gif.365 """366 for enum_index, item in enumerate(config["tablet_items"]):367 if "is_default_img" in item.keys():368 url = FLASK_HOME + "show_img_page/" + str(enum_index)369 TABLET_STATE["index"] = enum_index370 tablet_srv.showWebview(url)371 return {372 "showing": "default image"373 }374 tablet_srv.hideWebview()375 TABLET_STATE["index"] = None376 return {377 "showing": "Pepper default gif, no default image found in config",378 }379@app.route("/serve_audio/<path:filename>")380def serve_audio(filename):381 print(filename)382 return send_from_directory(config["audio_root_location"], filename)383@app.route("/play_audio")384def play_audio():385 index = request.args.get('index', type=int)386 print(index)387 print("playing sound")388 location = config["audio_files"][index]["location"]389 # stored locally on pepper, here we can nicely use the ALAudio_player390 try:391 audio_file = audio_player.loadFile(location)392 except RuntimeError, e:393 return {394 "status": "error",395 "msg": "Couldn't load sound file '{}'. Make sure it is saved ON your pepper robot and check our README".format(location)396 }397 audio_player.setVolume(audio_file, tts_srv.getVolume())398 audio_player.play(audio_file)399 audio_player.unloadAllFiles()400 return {401 "status": "ok",402 }403@app.route("/stop_sound_play")404def stop_sound_play():405 audio_player.stopAll()406 audio_player.unloadAllFiles()407 return {408 "status": "stopped all sounds that were playing"409 }410@app.route("/show_tablet_item/<index>")411def show_tablet_item(index):412 item = config["tablet_items"][int(index)]["file_name"]413 if is_external_path(item) and not is_video(item) and not is_image(item):414 # tablet item is external website415 tablet_srv.enableWifi()416 tablet_srv.showWebview(item)417 TABLET_STATE["video_or_website"] = True418 elif is_video(item):419 if is_external_path(item):420 # externally hosted video421 video_src = item422 else:423 # video hosted locally, prepare "external" path foir tablet424 video_src = FLASK_HOME + config["tablet_root_location"] + item425 tablet_srv.enableWifi()426 tablet_srv.playVideo(video_src)427 TABLET_STATE["video_or_website"] = True428 else:429 tablet_srv.showWebview(FLASK_HOME + "show_img_page/" + index)430 TABLET_STATE["video_or_website"] = False431 TABLET_STATE["index"] = index432 return {433 "status": "ok",434 "item": item435 }436def get_tablet_img_from_index(index):437 img_obj = config["tablet_items"][int(index)]438 img_src = ""439 if is_external_path(img_obj["file_name"]):440 # if its externally hosted we don't have to do anything441 img_src = img_obj["file_name"]442 else:443 img_src = "/" + config["tablet_root_location"] + img_obj["file_name"]444 return img_src445@app.route("/show_img_page/<index>")446def show_img_page(index):447 img_src = get_tablet_img_from_index(index)448 return render_template("img_view.html", src=img_src, img_index=index)449@app.route("/clear_tablet")450def clear_tablet():451 tablet_srv.hideWebview()452 TABLET_STATE["index"] = None453 status = show_default_img_or_hide()454 status["msg"] = "cleaned tablet webview"455 return status456@app.route("/ping_curr_tablet_item")457def ping_curr_tablet_item():458 index = request.args.get('index', type=str)459 if not TABLET_STATE["video_or_website"]:460 TABLET_STATE["last_ping"] = timer()461 TABLET_STATE["index"] = index462 return {463 "set cur_tab_item": index464 }465 else:466 print("Got image tab ping, but ignored it because website or video is currently on tablet...")467 return {468 "ignered ping for cur_tab_item": index469 }470@app.route("/adjust_volume")471def adjust_volume():472 target = request.args.get('volume', type=float)473 target = target / 100.0 # slider range is 1 - 100, api wants 0 - 1 474 tts_srv.setVolume(target)475 currently_playing = audio_player.getLoadedFilesIds()476 for file in currently_playing:477 audio_player.setVolume(int(file), tts_srv.getVolume())478 return {479 "status": "ok",480 "volume": target,481 "currently playing audio files": currently_playing482 }483@app.route("/stop_tts")484def stop_tts():485 tts_srv.stopAll()486 tts_srv.say("")487 return {488 "status": "stopped TTS msg!"489 }490@app.route("/exec_anim_speech")491def exec_anim_speech():492 index = request.args.get('index', type=int)493 print(index)494 annotated_text = config["animated_speech"][index]["string"]495 if is_txt_file(annotated_text):496 with open(annotated_text, "r") as f:497 annotated_text = f.read()498 as_srv.say(annotated_text)499 return {500 "status": "ok",501 "annotated_text": annotated_text502 }503@app.route("/exec_gesture")504def exec_gesture():505 index = request.args.get('index', type=int)506 print(index)507 gesture = config["gestures"][index]["gesture"]508 ap_srv.run(gesture)509 return {510 "status": "ok",511 "gesture": gesture512 }513@app.route("/exec_custom_gesture")514def exec_custom_gesture():515 string = request.args.get("string", type=str)516 print(string)517 gesture = unquote(string)518 print(gesture)519 ap_srv.run(gesture)520 return {521 "status": "ok",522 "gesture": gesture523 }524@app.route("/set_tts_param")525def set_tts_param():526 param = request.args.get("param", type=str)527 value = request.args.get("value", type=float)528 print(value)529 if param == "pitchShift":530 value = value / 100.0 # for pitch shift we need to adjust the range... nice consistency in the naoqi api >.<531 print(value)532 tts_srv.setParameter(param, value)533 else:534 tts_srv.setParameter(param, value)535 return {536 "status": "ok",537 "param": param,538 "value": value539 }540@app.route("/set_collision_radius")541def set_collision_radius():542 param = request.args.get("param", type=str)543 value = request.args.get("value", type=float)544 print(param)545 print(value)546 time.sleep(1)547 # get function dynamically from service object548 call = motion_srv.__getattribute__("set" + param + "SecurityDistance")549 call(value)550 return {551 "param": param,552 "value": value553 }554@app.route("/move_to")555def move_to():556 x = request.args.get("x", type=float)557 y = request.args.get("y", type=float)558 theta = request.args.get("theta", type=float)559 # Wake up robot560 # motion_service.wakeUp()561 # Send robot to Pose Init562 posture_srv.goToPosture("StandInit", 0.5)563 # set velocity564 motion_srv.moveTo(x, y, theta)565 return {566 "call": "move_to",567 "x": x,568 "y": y,569 "theta": theta570 }571@app.route("/stop_motion")572def stop_motion():573 motion_srv.stopMove()574 x_vel, y_vel, theta_vel = motion_srv.getRobotVelocity()575 x_vel = round(x_vel, 3)576 y_vel = round(y_vel, 3)577 theta_vel = round(theta_vel, 3)578 return {579 "status": "stopped move",580 "x_vel": x_vel,581 "y_vel": y_vel,582 "theta_vel": theta_vel583 }584@app.route("/resting_position")585def resting_position():586 motion_srv.stopMove()587 motion_srv.rest()588 return {589 "status": "entering resting position move"590 }591@app.route("/netural_stand_position")592def netural_stand_position():593 posture_srv.goToPosture("Stand", 0.5)594 return {595 "status": "entering 'Stand' posture"596 }597@app.route("/move_joint")598def move_joint():599 axis = request.args.get("axis", type=str)600 val = request.args.get("val", type=float)601 stiffness = 0.5602 time = 1603 if not motion_srv.robotIsWakeUp():604 motion_srv.wakeUp()605 motion_srv.setStiffnesses("Head", stiffness)606 motion_srv.angleInterpolation(607 [str(axis)], # which axis608 [float(val)], # amount of movement609 [int(time)], # time for movement610 False # in absolute angles611 )612 if "Head" in axis:613 status = "moving head"614 elif "Hip" in axis:615 status = "moving hip"616 return {617 "status": status,618 "axis": axis,619 "val": val,620 "time": time,621 "stiffness": stiffness622 }623@app.route("/camera_view")624def camera_view():625 # see if there are any old video subscribers...626 try:627 if video_srv.getSubscribers():628 for subscriber in video_srv.getSubscribers():629 if "CameraStream" in subscriber: # name passed as argument on subscription630 video_srv.unsubscribe(subscriber)631 except (NameError, RuntimeError):632 # happens when camera tab is open when there is no server has been restarted?633 return render_template("camera.html")634 resolution = vision_definitions.kQVGA # 320 * 240635 colorSpace = vision_definitions.kRGBColorSpace636 global imgClient637 imgClient = video_srv.subscribe("CameraStream", resolution, colorSpace, 30)638 global camera_tab_closed639 camera_tab_closed = False640 global camera_tab_timestamp641 camera_tab_timestamp = timer()642 global SpeechRecognition643 SpeechRecognition = SpeechRecognitionModule("SpeechRecognition", ip, port)644 SpeechRecognition.start()645 return render_template("camera.html")646@app.route("/close_camera_tab")647def close_camera_tab():648 try:649 global SpeechRecognition650 SpeechRecognition.stop()651 del SpeechRecognition652 if video_srv.getSubscribers():653 for subscriber in video_srv.getSubscribers():654 if "CameraStream" in subscriber: # name passed as argument on subscription655 video_srv.unsubscribe(subscriber)656 except (RuntimeError, NameError):657 # happens when cameratab is closed after naoqi session has been closed.658 pass659@app.route("/camera_tab_keep_alive")660def camera_tab_keep_alive():661 global camera_tab_timestamp662 camera_tab_timestamp = timer()663 connected = False664 try:665 if QI_SESSION is not None:666 # print(QI_SESSION.isConnected())667 # print QI_SESSION.__dict__668 if QI_SESSION.isConnected():669 connected = True670 else:671 connected = False672 except NameError:673 # when QI_SESSION is undefined, happens between connect and reconnect I think674 pass675 return {676 "set keep alive timestamp": camera_tab_timestamp,677 "connected": connected678 }679@app.route("/toggle_audio_mute")680def mute_audio():681 global SpeechRecognition682 if SpeechRecognition.isStarted:683 SpeechRecognition.stop()684 else:685 SpeechRecognition.start()686 return {687 "audio_running": SpeechRecognition.isStarted688 }689@app.route("/video_feed")690def video_feed():691 return Response(692 stream_generator(),693 mimetype='multipart/x-mixed-replace; boundary=frame')694def stream_generator():695 counter = 0696 while True:697 # frame = camera.get_frame()698 global imgClient699 try:700 alImage = video_srv.getImageRemote(imgClient)701 if alImage is not None:702 pil_img = alImage_to_PIL(alImage)703 # TODO: make image smaller? Might greatly decrease latency704 timestamp = datetime.now().strftime('%Y.%m.%d-%H:%M:%S.%f')[:-3]705 filename = timestamp + ".jpg"706 save_path = os.path.join(config["camera_save_dir"], filename)707 if not os.path.exists(config["camera_save_dir"]):708 os.makedirs(config["camera_save_dir"])709 if SAVE_IMGS:710 pil_img.save(save_path, "JPEG")711 jpeg_bytes = PIL_to_JPEG_BYTEARRAY(pil_img)712 counter += 1713 yield (b'--frame\r\n'714 b'Content-Type: image/jpeg\r\n\r\n' + jpeg_bytes + b'\r\n\r\n')715 time.sleep(0.01)716 except (RuntimeError, NameError):717 # when session gets disconnected by camera tab is open718 pass719@app.route("/toggle_img_save")720def toggle_img_save():721 global SAVE_IMGS722 SAVE_IMGS = not SAVE_IMGS723 return {724 "SAVE_IMGS": SAVE_IMGS,725 "save_dir": config["camera_save_dir"]726 }727@app.route("/record_audio_data")728def start_audio_recording():729 global RECORD_AUDIO730 RECORD_AUDIO = not RECORD_AUDIO731 timestamp = datetime.now().strftime('%Y.%m.%d-%H:%M:%S.%f')[:-3]732 filename = timestamp + ".wav"733 save_path = os.path.join(config["audio_save_dir"], filename)734 if RECORD_AUDIO:735 ad_srv.enableEnergyComputation()736 ar_srv.startMicrophonesRecording(737 save_path,738 "wav",739 16000, # samplerate740 [1, 1, 1, 1] # binary: which microphones do we want? [1, 1, 1, 1] => all four... [0, 1, 0, 0] specific one741 )742 else:743 ar_srv.stopMicrophonesRecording()744 ad_srv.disableEnergyComputation()745 return {746 "now_recording_audio": RECORD_AUDIO,747 "pepper_save_dir": config["audio_save_dir"],748 "filename": filename749 }750@app.route("/set_led_intensity")751def set_led_intensity():752 group = request.args.get('led_group', type=str)753 intensity = request.args.get('intensity', type=float)754 intensity = intensity / 100.0755 print(group)756 print(intensity)757 led_srv.setIntensity(group, intensity)758 return {759 "staus": "updated led intensity",760 "LED group": group,761 "intensity": intensity762 }763@app.route("/set_led_color")764def set_led_color():765 group = "FaceLeds"766 color = request.args.get('color', type=str)767 for color_enum in config["colors"]:768 if color_enum["title"] == color:769 r = color_enum["red"]770 g = color_enum["green"]771 b = color_enum["blue"]772 print(r, g, b)773 print(group)774 led_srv.fadeRGB(group, r, g, b, 0.5)775 return {776 "staus": "updated led color",777 "color": color778 }779@app.route("/exec_eye_anim")780def exec_eye_anim():781 anim = request.args.get('anim', type=str)782 duration = request.args.get('secs', type=str)783 duration = float(duration)784 prev_color = get_eye_colors()785 # print(prev_color)786 if anim == "randomEyes":787 led_srv.randomEyes(duration)788 elif anim == "rasta":789 led_srv.rasta(duration)790 elif anim == "rotateEyes":791 color = request.args.get('color', type=str)792 for color_enum in config["colors"]:793 if color_enum["title"] == color:794 final_hex_int = matplotlib.colors.to_hex([color_enum["red"], color_enum["green"], color_enum["blue"]])795 # print(final_hex_int)796 final_hex_int = final_hex_int.replace("#", "0x")797 # print(final_hex_int)798 final_hex_int = int(final_hex_int, 16)799 # print(final_hex_int)800 round_time = 1.0801 led_srv.rotateEyes(final_hex_int, round_time, float(duration))802 # led_srv.fadeRGB("FaceLeds", 1.0, 1.0, 1.0, 0.5)803 led_srv.fadeRGB("FaceLeds", prev_color[0], prev_color[1], prev_color[2], 0.5)804 return {805 "status": "eye anim",806 "animation": anim807 }808def get_eye_colors():809 # just return the value of one of the Leds in one of the eyes...810 # this is BGR -.- the inconsistency in this API is unreal...811 bgr = led_srv.getIntensity("RightFaceLed1")812 rgb = [round(bgr[2], 2), round(bgr[1], 2), round(bgr[0], 2)]813 return rgb814@app.route("/tablet_drawer")815def tablet_drawer():816 return render_template('tablet_drawer.html')817@app.route("/get_touch_data")818def get_touch_data():819 global touchmove_hist820 filtered_touchmove_list = []821 # this one we must reverse here, because doing this before would be more cumbersome822 for sequence in reversed(touchmove_hist):823 if len(sequence) > 2:824 filtered_touchmove_list.append(sequence)825 if len(filtered_touchmove_list) > 5:826 filtered_touchmove_list = filtered_touchmove_list[:5] # we keep the last 5 recent827 return {828 # we return the list in reverse order, so that we can put a nice fading color gradient on the older items...829 "touchdown_hist": touchdown_hist,830 "touchmove_hist": filtered_touchmove_list831 }832@app.route("/clear_touch_hist")833def cleat_touch_hist():834 global touchmove_hist835 touchmove_hist = []836 global touchdown_hist837 touchdown_hist = []838 global touchmove_ind839 touchmove_ind = 1840 global latest_touchmove_used841 latest_touchmove_used = 0842 return {843 "state": "reset all touch data to initial values"844 }845@app.route("/alive_test")846def alive_test():847 return {"status": "server is alive"}848def read_config(verbose=False):849 global config850 with open(CONFIG_FILE, "r") as f:851 # The FullLoader parameter handles the conversion from YAML852 # scalar values to Python the dictionary format853 config = yaml.safe_load(f)854 if verbose:855 print(config)856def pretty_print_shortcut(raw_string):857 """858 A custom Jinja 2 filter that formats the list.toString that we get in the frontent for the keyboard shortcut for859 the buttons.860 Is registered for Jinja in __main__861 :param raw_string: the list.toString() string form js862 :return: a beautified version of the string863 """864 pretty_string = str(raw_string) # raw string is a list a this point...865 pretty_string = pretty_string.replace("[", "")866 pretty_string = pretty_string.replace("]", "")867 pretty_string = pretty_string.replace("'", "")868 pretty_string = pretty_string.replace(",", "")869 pretty_string = pretty_string.replace(" ", " + ")870 return pretty_string871if __name__ == '__main__':872 parser = argparse.ArgumentParser()873 parser.add_argument("-c", dest="config", default="config.yaml", type=str, help="Which YAML configuration file to use. ")874 args = parser.parse_args()875 global CONFIG_FILE876 CONFIG_FILE = args.config877 read_config()878 # register custom filter for jinja2, so that we can use it in the frontend879 jinja2.filters.FILTERS['prettyshortcut'] = pretty_print_shortcut...

Full Screen

Full Screen

test_single_finger.py

Source:test_single_finger.py Github

copy

Full Screen

1# This Source Code Form is subject to the terms of the Mozilla Public2# License, v. 2.0. If a copy of the MPL was not distributed with this3# file, You can obtain one at http://mozilla.org/MPL/2.0/.4from marionette_test import MarionetteTestCase5from marionette import Actions6from marionette import MarionetteException7#add this directory to the path8import os9import sys10sys.path.append(os.path.dirname(__file__))11from single_finger_functions import *12class testSingleFinger(MarionetteTestCase):13 def test_press_release(self):14 press_release(self.marionette, self.wait_for_condition, "button1-touchstart-touchend-mousemove-mousedown-mouseup-click")15 def test_move_element(self):16 move_element(self.marionette, self.wait_for_condition, "button1-touchstart", "button2-touchmove-touchend")17 """18 #Skipping due to Bug 87491419 def test_move_by_offset(self):20 move_element_offset(self.marionette, self.wait_for_condition, "button1-touchstart", "button2-touchmove-touchend")21 """22 def test_no_press(self):23 testAction = self.marionette.absolute_url("testAction.html")24 self.marionette.navigate(testAction)25 action = Actions(self.marionette)26 action.release()27 self.assertRaises(MarionetteException, action.perform)28 def test_wait(self):29 wait(self.marionette, self.wait_for_condition, "button1-touchstart-touchend-mousemove-mousedown-mouseup-click")30 def test_wait_with_value(self):31 wait_with_value(self.marionette, self.wait_for_condition, "button1-touchstart-touchend-mousemove-mousedown-mouseup-click")32 def test_context_menu(self):33 context_menu(self.marionette, self.wait_for_condition, "button1-touchstart-contextmenu", "button1-touchstart-contextmenu-touchend")34 def test_long_press_action(self):35 long_press_action(self.marionette, self.wait_for_condition, "button1-touchstart-contextmenu-touchend")36 """37 #Skipping due to Bug 86533438 def test_long_press_fail(self):39 testAction = self.marionette.absolute_url("testAction.html")40 self.marionette.navigate(testAction)41 button = self.marionette.find_element("id", "button1Copy")42 action = Actions(self.marionette)43 action.press(button).long_press(button, 5)44 self.assertRaises(MarionetteException, action.perform)45 """46 def test_chain(self):47 chain(self.marionette, self.wait_for_condition, "button1-touchstart", "delayed-touchmove-touchend")48 """49 #Skipping due to Bug 874914. Flick uses chained moveByOffset calls50 def test_chain_flick(self):51 chain_flick(self.marionette, self.wait_for_condition, "button1-touchstart-touchmove", "buttonFlick-touchmove-touchend")52 """53 """54 #Skipping due to Bug 86533455 def test_touchcancel_chain(self):56 testAction = self.marionette.absolute_url("testAction.html")57 self.marionette.navigate(testAction)58 button = self.marionette.find_element("id", "button1")59 action = Actions(self.marionette)60 action.press(button).wait(5).cancel()61 action.perform()62 expected = "button1-touchstart-touchcancel"63 self.wait_for_condition(lambda m: m.execute_script("return document.getElementById('button1').innerHTML;") == expected)64 """65 def test_single_tap(self):66 single_tap(self.marionette, self.wait_for_condition, "button1-touchstart-touchend-mousemove-mousedown-mouseup-click")67 def test_double_tap(self):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run fMBT automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful