How to use __notify_listeners method in yandex-tank

Best Python code snippet using yandex-tank

basic_connector.py

Source:basic_connector.py Github

copy

Full Screen

...48 ###########################49 # Event handlers #50 ###########################51 def on_event(self, event: str) -> None:52 self.__notify_listeners(event)53 self.__notify_touch_listeners(event)54 def on_posture_changed(self, posture: str) -> None:55 self.__notify_listeners('onPostureChanged', posture)56 self.robot_state['posture'] = RobotPosture[posture.upper()]57 def on_awake_changed(self, is_awake: bool) -> None:58 self.__notify_listeners('onAwakeChanged', is_awake)59 self.robot_state['is_awake'] = is_awake60 def on_audio_language(self, language_key: str) -> None:61 self.__notify_listeners('onAudioLanguage', language_key)62 def on_audio_intent(self, detection_result: dict) -> None:63 self.__notify_listeners('onAudioIntent', detection_result)64 def on_text_transcript(self, transcript: str) -> None:65 self.__notify_listeners('onTextTranscript', transcript)66 def on_text_sentiment(self, sentiment: str) -> None:67 self.__notify_listeners('onTextSentiment', sentiment)68 def on_new_audio_file(self, audio_file: str) -> None:69 self.__notify_listeners('onNewAudioFile', audio_file)70 def on_new_picture_file(self, picture_file: str) -> None:71 self.__notify_listeners('onNewPictureFile', picture_file)72 def on_person_detected(self, x: int, y: int) -> None:73 self.__notify_vision_listeners('onPersonDetected', x, y)74 def on_face_recognized(self, identifier: str) -> None:75 self.__notify_vision_listeners('onFaceRecognized', identifier)76 def on_emotion_detected(self, emotion: str) -> None:77 self.__notify_vision_listeners('onEmotionDetected', emotion)78 def on_corona_check_passed(self) -> None:79 self.__notify_vision_listeners('onCoronaCheckPassed')80 def on_battery_charge_changed(self, percentage: int) -> None:81 self.__notify_listeners('onBatteryChargeChanged', percentage)82 self.robot_state['battery_charge'] = percentage83 def on_charging_changed(self, is_charging: bool) -> None:84 self.__notify_listeners('onChargingChanged', is_charging)85 self.robot_state['is_charging'] = is_charging86 def on_hot_device_detected(self, hot_devices: list) -> None:87 self.__notify_listeners('onHotDeviceDetected', hot_devices)88 self.robot_state['hot_devices'] = hot_devices89 def on_robot_motion_recording(self, motion: str) -> None:90 self.__notify_listeners('onRobotMotionRecording', motion)91 def on_browser_button(self, button: str) -> None:92 self.__notify_listeners('onBrowserButton', button)93 ###########################94 # Speech Recognition #95 ###########################96 def speech_recognition(self, context: str, max_duration: int, callback: callable = None) -> None:97 """98 Initiate a speech recognition attempt using Google's Dialogflow using a context.99 For more information on contexts see: https://cloud.google.com/dialogflow/docs/contexts-overview100 The robot will stream audio for at most max_duraction seconds to Dialogflow to recognize something.101 The result (or a 'fail') is returned via the callback function.102 :param context: Google's Dialogflow context label (str)103 :param max_duration: maximum time to listen in seconds (int)104 :param callback: callback function that will be called when a result (or fail) becomes available105 :return:106 """107 enhanced_callback, fail_callback, lock = self.__build_speech_recording_callback(callback)108 self.__register_listener('onAudioIntent', enhanced_callback)109 self.__register_listener('IntentDetectionDone', fail_callback)110 Thread(target=self.__recognizing, args=(context, lock, max_duration)).start()111 def record_audio(self, duration: int, callback: callable = None) -> None:112 """113 Records audio for a number of duration seconds. The location of the audio is returned via the callback function.114 :param duration: number of second of audio that will be recorded.115 :param callback: callback function that will be called when the audio is recorded.116 :return:117 """118 success_callback, _, lock = self.__build_speech_recording_callback(callback)119 self.__register_listener('onNewAudioFile', success_callback)120 Thread(target=self.__recording, args=(lock, duration)).start()121 def __recognizing(self, context: str, lock: Event, max_duration: int) -> None:122 self.stop_listening()123 self.set_dialogflow_context(context)124 self.start_listening(max_duration)125 lock.wait()126 self.__unregister_listener('onAudioIntent')127 self.__unregister_listener('IntentDetectionDone')128 def __recording(self, lock: Event, max_duration: int) -> None:129 self.stop_listening()130 self.set_record_audio(True)131 self.start_listening(max_duration)132 lock.wait()133 self.set_record_audio(False)134 self.__unregister_listener('onNewAudioFile')135 @staticmethod136 def __build_speech_recording_callback(embedded_callback: callable = None):137 lock = Event()138 def success_callback(*args):139 lock.set()140 if embedded_callback:141 embedded_callback(*args)142 def fail_callback():143 if not lock.is_set():144 lock.set()145 if embedded_callback:146 embedded_callback(None)147 return success_callback, fail_callback, lock148 ###########################149 # Vision #150 ###########################151 def take_picture(self, callback: callable = None) -> None:152 """153 Take a picture. Location of the stored picture is returned via callback.154 :param callback:155 :return:156 """157 self.__register_listener('onNewPictureFile', callback)158 super(BasicSICConnector, self).take_picture()159 def start_face_recognition(self, callback: callable = None) -> None:160 """161 Start face recognition. Each time a face is detected, the callback function is called with the recognition result.162 :param callback:163 :return:164 """165 self.__start_vision_recognition('onFaceRecognized', callback)166 def stop_face_recognition(self) -> None:167 """168 Stop face recognition.169 :return:170 """171 self.__stop_vision_recognition('onFaceRecognized')172 def start_people_detection(self, callback: callable = None) -> None:173 """174 Start people detection. Each time a person is detected, the callback function is called.175 :param callback:176 :return:177 """178 self.__start_vision_recognition('onPersonDetected', callback)179 def stop_people_detection(self) -> None:180 """181 Stop people detection.182 :return:183 """184 self.__stop_vision_recognition('onPersonDetected')185 def start_emotion_detection(self, callback: callable = None) -> None:186 """187 Start emotion detection. Each time an emotion becomes available the callback function is called with the emotion.188 :param callback:189 :return:190 """191 self.__start_vision_recognition('onEmotionDetected', callback)192 def stop_emotion_detection(self) -> None:193 """194 Stop emotion detection.195 :return:196 """197 self.__stop_vision_recognition('onEmotionDetected')198 def start_corona_checker(self, callback: callable = None) -> None:199 """200 Start the corona checker. Each time a valid QR code becomes available the callback function is called.201 :param callback:202 :return:203 """204 self.__start_vision_recognition('onCoronaCheckPassed', callback)205 def stop_corona_checker(self) -> None:206 """207 Stop the corona checker.208 :return:209 """210 self.__stop_vision_recognition('onCoronaCheckPassed')211 def __start_vision_recognition(self, event: str, callback: callable = None) -> None:212 if not self.__vision_listeners:213 self.stop_looking()214 self.start_looking(0)215 self.__register_vision_listener(event, callback)216 def __stop_vision_recognition(self, event: str) -> None:217 self.__unregister_vision_listener(event)218 if not self.__vision_listeners:219 self.stop_looking()220 ###########################221 # Touch #222 ###########################223 def subscribe_touch_listener(self, touch_event: str, callback: callable) -> None:224 """225 Subscribe a touch listener. The callback function will be called each time the touch_event becomes available.226 :param touch_event:227 :param callback:228 :return:229 """230 self.__touch_listeners[touch_event] = callback231 def unsubscribe_touch_listener(self, touch_event: str) -> None:232 """233 Unsubscribe touch listener.234 :param touch_event:235 :return:236 """237 del self.__touch_listeners[touch_event]238 ###########################239 # Robot actions #240 ###########################241 def set_language(self, language_key: str, callback: callable = None) -> None:242 if callback:243 self.__register_listener('LanguageChanged', callback)244 super(BasicSICConnector, self).set_language(language_key)245 def set_idle(self, callback: callable = None) -> None:246 if callback:247 self.__register_listener('SetIdle', callback)248 super(BasicSICConnector, self).set_idle()249 def set_non_idle(self, callback: callable = None) -> None:250 if callback:251 self.__register_listener('SetNonIdle', callback)252 super(BasicSICConnector, self).set_non_idle()253 def say(self, text: str, callback: callable = None) -> None:254 if callback:255 self.__register_listener('TextDone', callback)256 super(BasicSICConnector, self).say(text)257 def say_animated(self, text: str, callback: callable = None) -> None:258 if callback:259 self.__register_listener('TextDone', callback)260 super(BasicSICConnector, self).say_animated(text)261 def do_gesture(self, gesture: str, callback: callable = None) -> None:262 if callback:263 self.__register_listener('GestureDone', callback)264 super(BasicSICConnector, self).do_gesture(gesture)265 def play_audio(self, audio_file: str, callback: callable = None) -> None:266 if callback:267 self.__register_listener('PlayAudioDone', callback)268 super(BasicSICConnector, self).play_audio(audio_file)269 def load_audio(self, audio_file: str, callback: callable = None) -> None:270 if callback:271 self.__register_listener('LoadAudioDone', callback)272 super(BasicSICConnector, self).load_audio(audio_file)273 def play_loaded_audio(self, audio_identifier: int, callback: callable = None) -> None:274 if callback:275 self.__register_listener('PlayAudioDone', callback)276 super(BasicSICConnector, self).play_loaded_audio(audio_identifier)277 def clear_loaded_audio(self, callback: callable = None) -> None:278 if callback:279 self.__register_listener('ClearLoadedAudioDone', callback)280 super(BasicSICConnector, self).clear_loaded_audio()281 def set_eye_color(self, color: str, callback: callable = None) -> None:282 if callback:283 self.__register_listener('EyeColourDone', callback)284 super(BasicSICConnector, self).set_eye_color(color)285 def set_ear_color(self, color: str, callback: callable = None) -> None:286 if callback:287 self.__register_listener('EarColourDone', callback)288 super(BasicSICConnector, self).set_ear_color(color)289 def set_head_color(self, color: str, callback: callable = None) -> None:290 if callback:291 self.__register_listener('HeadColourDone', callback)292 super(BasicSICConnector, self).set_head_color(color)293 def set_led_color(self, leds: list, colors: list, duration: int = 0, callback: callable = None) -> None:294 if callback:295 self.__register_listener('LedColorDone', callback)296 super(BasicSICConnector, self).set_led_color(leds, colors, duration)297 def start_led_animation(self, led_group: str, anim_type: str, colors: list, speed: int,298 callback: callable = None) -> None:299 if callback:300 self.__register_listener('LedAnimationDone', callback)301 super(BasicSICConnector, self).start_led_animation(led_group, anim_type, colors, speed)302 def turn(self, degrees: int, callback: callable = None) -> None:303 if callback:304 self.__register_listener('TurnDone', callback)305 super(BasicSICConnector, self).turn(degrees)306 def wake_up(self, callback: callable = None) -> None:307 if callback:308 self.__register_listener('WakeUpDone', callback)309 super(BasicSICConnector, self).wake_up()310 def rest(self, callback: callable = None) -> None:311 if callback:312 self.__register_listener('RestDone', callback)313 super(BasicSICConnector, self).rest()314 def set_breathing(self, enable: bool, callback: callable = None) -> None:315 if callback:316 self.__register_listener('Breathing' + ('Enabled' if enable else 'Disabled'), callback)317 super(BasicSICConnector, self).set_breathing(enable)318 def go_to_posture(self, posture: Enum, speed: int = 100, callback: callable = None) -> None:319 """320 The robot will try for 3 times to reach a position.321 go_to_posture's callback returns a bool indicating whether the given posture was successfully reached.322 """323 if callback:324 self.__register_listener('GoToPostureDone', partial(self.__posture_callback,325 target_posture=posture,326 embedded_callback=callback))327 super(BasicSICConnector, self).go_to_posture(posture.value, speed)328 def __posture_callback(self, target_posture: str, embedded_callback: callable) -> None:329 if self.robot_state['posture'] == target_posture: # if posture was successfully reached330 embedded_callback(True) # call the listener to signal a success331 else: # if the posture was not reached332 embedded_callback(False) # call the listener to signal a failure333 def set_stiffness(self, joints: list, stiffness: int, duration: int = 1000, callback: callable = None) -> None:334 if callback:335 self.__register_listener('SetStiffnessDone', callback)336 super(BasicSICConnector, self).set_stiffness(joints, stiffness, duration)337 def play_motion(self, motion: str, callback: callable = None) -> None:338 if callback:339 self.__register_listener('PlayMotionDone', callback)340 super(BasicSICConnector, self).play_motion(motion)341 def start_record_motion(self, joint_chains: list, framerate: int = 5, callback: callable = None) -> None:342 if callback:343 self.__register_listener('RecordMotionStarted', callback)344 super(BasicSICConnector, self).start_record_motion(joint_chains, framerate)345 def stop_record_motion(self, callback: callable = None) -> None:346 if callback:347 self.__register_listener('onRobotMotionRecording', callback)348 super(BasicSICConnector, self).stop_record_motion()349 def browser_show(self, html: str, callback: callable = None) -> None:350 super(BasicSICConnector, self).browser_show(html)351 ###########################352 # Robot action Listeners #353 ###########################354 def subscribe_condition(self, condition: Condition) -> None:355 """356 Subscribe a threading.Condition object that will be notified each time a registered callback is called.357 :param condition: Condition object that will be notified358 :return:359 """360 self.__conditions.append(condition)361 def unsubscribe_condition(self, condition: Condition) -> None:362 """363 Unsubscribe the threading.Condition object.364 :param condition: Condition object to unsubscribe365 :return:366 """367 if condition in self.__conditions:368 self.__conditions.remove(condition)369 def __notify_conditions(self) -> None:370 for condition in self.__conditions:371 with condition:372 condition.notify()373 def __register_listener(self, event: str, callback: callable) -> None:374 self.__listeners[event] = callback375 def __unregister_listener(self, event: str) -> None:376 del self.__listeners[event]377 def __register_vision_listener(self, event: str, callback: callable) -> None:378 self.__vision_listeners[event] = callback379 def __unregister_vision_listener(self, event: str) -> None:380 del self.__vision_listeners[event]381 def __notify_listeners(self, event: str, *args) -> None:382 if event in self.__listeners:383 listener = self.__listeners[event]384 listener(*args)385 self.__notify_conditions()386 def __notify_vision_listeners(self, event: str, *args) -> None:387 if event in self.__vision_listeners:388 listener = self.__vision_listeners[event]389 listener(*args)390 self.__notify_conditions()391 def __notify_touch_listeners(self, event: str, *args) -> None:392 if event in self.__touch_listeners:393 listener = self.__touch_listeners[event]394 listener(*args)395 self.__notify_conditions()...

Full Screen

Full Screen

PressureInput.py

Source:PressureInput.py Github

copy

Full Screen

...35 strong_action_hysteresis: float = 3036 __listeners: [SipPuffListener] = []37 def __init__(self, sensor: IPressureSensor):38 self.__pressure_sensor = sensor39 def __notify_listeners(self, event: SipPuffEvent):40 if self.debug:41 print("Notifying listeners of event: " + event.__str__())42 for listener in self.__listeners:43 listener.handle_sip_puff_event(event)44 def register_listener(self, listener: SipPuffListener):45 self.__listeners.append(listener)46 def update(self):47 """48 Takes a reading from the underlying sensor and processes it.49 This should be called in a loop externally.50 :return: None, A side effect of this method might be the emission of a :class:SipPuffEvent to listeners51 """52 sensor_value = self.__pressure_sensor.get_pressure_in_Pascal()53 self.__reference_pressure_filter.update(sensor_value)54 reference_value = self.__reference_pressure_filter.get_ambient_pressure_estimation()55 pdiff = sensor_value - reference_value56 if self.debug:57 print("Reference value: " + str(reference_value))58 print("Sensor value: " + str(sensor_value))59 print("Pressure difference: " + str(pdiff))60 if self.__current_state == InputState.IDLE:61 self.__update_IDLE(pdiff)62 elif self.__current_state == InputState.MEASURING:63 self.__update_MEASURING(pdiff)64 elif self.__current_state == InputState.FINISHED_WAITING:65 self.__update_FINISHED_WAITING(pdiff)66 else:67 raise Exception("Untreated enum value for input state: " + self.__current_state.__str__())68 def get_current_duration(self) -> Optional[float]:69 if self.__action_start_time is None:70 return None71 else:72 return time.perf_counter() - self.__action_start_time73 def __get_action_avg_pressure(self) -> float:74 if len(self.__action_pressure_history) < 1:75 return 076 else:77 return sum(self.__action_pressure_history) / len(self.__action_pressure_history)78 def __update_IDLE(self, pdiff):79 # clear bookkeeping80 self.__action_start_time = None81 self.__action_pressure_history.clear()82 # Ignore no sipping or puffing83 if abs(pdiff) < self.weak_action_thresh:84 return85 if self.debug:86 print("Changing mode from to measuring for next cycle")87 # Start the timer and add the observation88 self.__action_start_time = time.perf_counter()89 self.__action_pressure_history.append(pdiff)90 # Change state to Measuring91 self.__current_state = InputState.MEASURING92 def __update_MEASURING(self, pdiff):93 # Check if we are past the duration for a long event94 if self.get_current_duration() > self.long_Action_min_time:95 self.__current_state = InputState.FINISHED_WAITING96 if self.debug:97 print("Changing mode to waiting for next cycle")98 avg_pressure = self.__get_action_avg_pressure()99 # Order matters here. Match the extreme conditions first!100 if avg_pressure < -self.strong_action_thresh:101 self.__notify_listeners(SipPuffEvent.LONG_STRONG_SIP)102 elif avg_pressure > self.strong_action_thresh:103 self.__notify_listeners(SipPuffEvent.LONG_STRONG_PUFF)104 elif avg_pressure < -self.weak_action_thresh:105 self.__notify_listeners(SipPuffEvent.LONG_WEAK_SIP)106 elif avg_pressure > self.weak_action_thresh:107 self.__notify_listeners(SipPuffEvent.LONG_WEAK_PUFF)108 else:109 pass110 return111 # If we slip below the hysteresis around 0, the action is over112 if abs(pdiff) < self.weak_action_thresh - self.weak_action_hysteresis:113 self.__current_state = InputState.IDLE114 # Have we cleared the minimum time for an action?115 if self.get_current_duration() < self.short_action_min_time:116 return117 avg_pressure = self.__get_action_avg_pressure()118 if avg_pressure < -self.strong_action_thresh:119 self.__notify_listeners(SipPuffEvent.SHORT_STRONG_SIP)120 elif avg_pressure > self.strong_action_thresh:121 self.__notify_listeners(SipPuffEvent.SHORT_STRONG_PUFF)122 elif avg_pressure < -self.weak_action_thresh:123 self.__notify_listeners(SipPuffEvent.SHORT_WEAK_SIP)124 elif avg_pressure > self.weak_action_thresh:125 self.__notify_listeners(SipPuffEvent.SHORT_WEAK_PUFF)126 else:127 pass128 return129 # Append measurement at the end if we do not stop130 # We want to ignore the measurement that's falling below the threshold131 self.__action_pressure_history.append(pdiff)132 def __update_FINISHED_WAITING(self, pdiff):133 # This state is for waiting out the time after a long input134 # without triggering a new one135 if abs(pdiff) < self.weak_action_thresh - self.weak_action_hysteresis:136 self.__current_state = InputState.IDLE137 if self.debug:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run yandex-tank automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful