Best Python code snippet using yandex-tank
basic_connector.py
Source:basic_connector.py  
...48    ###########################49    # Event handlers          #50    ###########################51    def on_event(self, event: str) -> None:52        self.__notify_listeners(event)53        self.__notify_touch_listeners(event)54    def on_posture_changed(self, posture: str) -> None:55        self.__notify_listeners('onPostureChanged', posture)56        self.robot_state['posture'] = RobotPosture[posture.upper()]57    def on_awake_changed(self, is_awake: bool) -> None:58        self.__notify_listeners('onAwakeChanged', is_awake)59        self.robot_state['is_awake'] = is_awake60    def on_audio_language(self, language_key: str) -> None:61        self.__notify_listeners('onAudioLanguage', language_key)62    def on_audio_intent(self, detection_result: dict) -> None:63        self.__notify_listeners('onAudioIntent', detection_result)64    def on_text_transcript(self, transcript: str) -> None:65        self.__notify_listeners('onTextTranscript', transcript)66    def on_text_sentiment(self, sentiment: str) -> None:67        self.__notify_listeners('onTextSentiment', sentiment)68    def on_new_audio_file(self, audio_file: str) -> None:69        self.__notify_listeners('onNewAudioFile', audio_file)70    def on_new_picture_file(self, picture_file: str) -> None:71        self.__notify_listeners('onNewPictureFile', picture_file)72    def on_person_detected(self, x: int, y: int) -> None:73        self.__notify_vision_listeners('onPersonDetected', x, y)74    def on_face_recognized(self, identifier: str) -> None:75        self.__notify_vision_listeners('onFaceRecognized', identifier)76    def on_emotion_detected(self, emotion: str) -> None:77        self.__notify_vision_listeners('onEmotionDetected', emotion)78    def on_corona_check_passed(self) -> None:79        self.__notify_vision_listeners('onCoronaCheckPassed')80    def on_battery_charge_changed(self, percentage: int) -> None:81        self.__notify_listeners('onBatteryChargeChanged', percentage)82        self.robot_state['battery_charge'] = percentage83    def on_charging_changed(self, is_charging: bool) -> None:84        self.__notify_listeners('onChargingChanged', is_charging)85        self.robot_state['is_charging'] = is_charging86    def on_hot_device_detected(self, hot_devices: list) -> None:87        self.__notify_listeners('onHotDeviceDetected', hot_devices)88        self.robot_state['hot_devices'] = hot_devices89    def on_robot_motion_recording(self, motion: str) -> None:90        self.__notify_listeners('onRobotMotionRecording', motion)91    def on_browser_button(self, button: str) -> None:92        self.__notify_listeners('onBrowserButton', button)93    ###########################94    # Speech Recognition      #95    ###########################96    def speech_recognition(self, context: str, max_duration: int, callback: callable = None) -> None:97        """98        Initiate a speech recognition attempt using Google's Dialogflow using a context.99        For more information on contexts see: https://cloud.google.com/dialogflow/docs/contexts-overview100        The robot will stream audio for at most max_duraction seconds to Dialogflow to recognize something.101        The result (or a 'fail') is returned via the callback function.102        :param context: Google's Dialogflow context label (str)103        :param max_duration: maximum time to listen in seconds (int)104        :param callback: callback function that will be called when a result (or fail) becomes available105        :return:106        """107        enhanced_callback, fail_callback, lock = self.__build_speech_recording_callback(callback)108        self.__register_listener('onAudioIntent', enhanced_callback)109        self.__register_listener('IntentDetectionDone', fail_callback)110        Thread(target=self.__recognizing, args=(context, lock, max_duration)).start()111    def record_audio(self, duration: int, callback: callable = None) -> None:112        """113        Records audio for a number of duration seconds. The location of the audio is returned via the callback function.114        :param duration: number of second of audio that will be recorded.115        :param callback: callback function that will be called when the audio is recorded.116        :return:117        """118        success_callback, _, lock = self.__build_speech_recording_callback(callback)119        self.__register_listener('onNewAudioFile', success_callback)120        Thread(target=self.__recording, args=(lock, duration)).start()121    def __recognizing(self, context: str, lock: Event, max_duration: int) -> None:122        self.stop_listening()123        self.set_dialogflow_context(context)124        self.start_listening(max_duration)125        lock.wait()126        self.__unregister_listener('onAudioIntent')127        self.__unregister_listener('IntentDetectionDone')128    def __recording(self, lock: Event, max_duration: int) -> None:129        self.stop_listening()130        self.set_record_audio(True)131        self.start_listening(max_duration)132        lock.wait()133        self.set_record_audio(False)134        self.__unregister_listener('onNewAudioFile')135    @staticmethod136    def __build_speech_recording_callback(embedded_callback: callable = None):137        lock = Event()138        def success_callback(*args):139            lock.set()140            if embedded_callback:141                embedded_callback(*args)142        def fail_callback():143            if not lock.is_set():144                lock.set()145                if embedded_callback:146                    embedded_callback(None)147        return success_callback, fail_callback, lock148    ###########################149    # Vision                  #150    ###########################151    def take_picture(self, callback: callable = None) -> None:152        """153        Take a picture. Location of the stored picture is returned via callback.154        :param callback:155        :return:156        """157        self.__register_listener('onNewPictureFile', callback)158        super(BasicSICConnector, self).take_picture()159    def start_face_recognition(self, callback: callable = None) -> None:160        """161        Start face recognition. Each time a face is detected, the callback function is called with the recognition result.162        :param callback:163        :return:164        """165        self.__start_vision_recognition('onFaceRecognized', callback)166    def stop_face_recognition(self) -> None:167        """168        Stop face recognition.169        :return:170        """171        self.__stop_vision_recognition('onFaceRecognized')172    def start_people_detection(self, callback: callable = None) -> None:173        """174        Start people detection. Each time a person is detected, the callback function is called.175        :param callback:176        :return:177        """178        self.__start_vision_recognition('onPersonDetected', callback)179    def stop_people_detection(self) -> None:180        """181        Stop people detection.182        :return:183        """184        self.__stop_vision_recognition('onPersonDetected')185    def start_emotion_detection(self, callback: callable = None) -> None:186        """187        Start emotion detection. Each time an emotion becomes available the callback function is called with the emotion.188        :param callback:189        :return:190        """191        self.__start_vision_recognition('onEmotionDetected', callback)192    def stop_emotion_detection(self) -> None:193        """194        Stop emotion detection.195        :return:196        """197        self.__stop_vision_recognition('onEmotionDetected')198    def start_corona_checker(self, callback: callable = None) -> None:199        """200        Start the corona checker. Each time a valid QR code becomes available the callback function is called.201        :param callback:202        :return:203        """204        self.__start_vision_recognition('onCoronaCheckPassed', callback)205    def stop_corona_checker(self) -> None:206        """207        Stop the corona checker.208        :return:209        """210        self.__stop_vision_recognition('onCoronaCheckPassed')211    def __start_vision_recognition(self, event: str, callback: callable = None) -> None:212        if not self.__vision_listeners:213            self.stop_looking()214            self.start_looking(0)215        self.__register_vision_listener(event, callback)216    def __stop_vision_recognition(self, event: str) -> None:217        self.__unregister_vision_listener(event)218        if not self.__vision_listeners:219            self.stop_looking()220    ###########################221    # Touch                   #222    ###########################223    def subscribe_touch_listener(self, touch_event: str, callback: callable) -> None:224        """225        Subscribe a touch listener. The callback function will be called each time the touch_event becomes available.226        :param touch_event:227        :param callback:228        :return:229        """230        self.__touch_listeners[touch_event] = callback231    def unsubscribe_touch_listener(self, touch_event: str) -> None:232        """233        Unsubscribe touch listener.234        :param touch_event:235        :return:236        """237        del self.__touch_listeners[touch_event]238    ###########################239    # Robot actions           #240    ###########################241    def set_language(self, language_key: str, callback: callable = None) -> None:242        if callback:243            self.__register_listener('LanguageChanged', callback)244        super(BasicSICConnector, self).set_language(language_key)245    def set_idle(self, callback: callable = None) -> None:246        if callback:247            self.__register_listener('SetIdle', callback)248        super(BasicSICConnector, self).set_idle()249    def set_non_idle(self, callback: callable = None) -> None:250        if callback:251            self.__register_listener('SetNonIdle', callback)252        super(BasicSICConnector, self).set_non_idle()253    def say(self, text: str, callback: callable = None) -> None:254        if callback:255            self.__register_listener('TextDone', callback)256        super(BasicSICConnector, self).say(text)257    def say_animated(self, text: str, callback: callable = None) -> None:258        if callback:259            self.__register_listener('TextDone', callback)260        super(BasicSICConnector, self).say_animated(text)261    def do_gesture(self, gesture: str, callback: callable = None) -> None:262        if callback:263            self.__register_listener('GestureDone', callback)264        super(BasicSICConnector, self).do_gesture(gesture)265    def play_audio(self, audio_file: str, callback: callable = None) -> None:266        if callback:267            self.__register_listener('PlayAudioDone', callback)268        super(BasicSICConnector, self).play_audio(audio_file)269    def load_audio(self, audio_file: str, callback: callable = None) -> None:270        if callback:271            self.__register_listener('LoadAudioDone', callback)272        super(BasicSICConnector, self).load_audio(audio_file)273    def play_loaded_audio(self, audio_identifier: int, callback: callable = None) -> None:274        if callback:275            self.__register_listener('PlayAudioDone', callback)276        super(BasicSICConnector, self).play_loaded_audio(audio_identifier)277    def clear_loaded_audio(self, callback: callable = None) -> None:278        if callback:279            self.__register_listener('ClearLoadedAudioDone', callback)280        super(BasicSICConnector, self).clear_loaded_audio()281    def set_eye_color(self, color: str, callback: callable = None) -> None:282        if callback:283            self.__register_listener('EyeColourDone', callback)284        super(BasicSICConnector, self).set_eye_color(color)285    def set_ear_color(self, color: str, callback: callable = None) -> None:286        if callback:287            self.__register_listener('EarColourDone', callback)288        super(BasicSICConnector, self).set_ear_color(color)289    def set_head_color(self, color: str, callback: callable = None) -> None:290        if callback:291            self.__register_listener('HeadColourDone', callback)292        super(BasicSICConnector, self).set_head_color(color)293    def set_led_color(self, leds: list, colors: list, duration: int = 0, callback: callable = None) -> None:294        if callback:295            self.__register_listener('LedColorDone', callback)296        super(BasicSICConnector, self).set_led_color(leds, colors, duration)297    def start_led_animation(self, led_group: str, anim_type: str, colors: list, speed: int,298                            callback: callable = None) -> None:299        if callback:300            self.__register_listener('LedAnimationDone', callback)301        super(BasicSICConnector, self).start_led_animation(led_group, anim_type, colors, speed)302    def turn(self, degrees: int, callback: callable = None) -> None:303        if callback:304            self.__register_listener('TurnDone', callback)305        super(BasicSICConnector, self).turn(degrees)306    def wake_up(self, callback: callable = None) -> None:307        if callback:308            self.__register_listener('WakeUpDone', callback)309        super(BasicSICConnector, self).wake_up()310    def rest(self, callback: callable = None) -> None:311        if callback:312            self.__register_listener('RestDone', callback)313        super(BasicSICConnector, self).rest()314    def set_breathing(self, enable: bool, callback: callable = None) -> None:315        if callback:316            self.__register_listener('Breathing' + ('Enabled' if enable else 'Disabled'), callback)317        super(BasicSICConnector, self).set_breathing(enable)318    def go_to_posture(self, posture: Enum, speed: int = 100, callback: callable = None) -> None:319        """320        The robot will try for 3 times to reach a position.321        go_to_posture's callback returns a bool indicating whether the given posture was successfully reached.322        """323        if callback:324            self.__register_listener('GoToPostureDone', partial(self.__posture_callback,325                                                                target_posture=posture,326                                                                embedded_callback=callback))327        super(BasicSICConnector, self).go_to_posture(posture.value, speed)328    def __posture_callback(self, target_posture: str, embedded_callback: callable) -> None:329        if self.robot_state['posture'] == target_posture:  # if posture was successfully reached330            embedded_callback(True)  # call the listener to signal a success331        else:  # if the posture was not reached332            embedded_callback(False)  # call the listener to signal a failure333    def set_stiffness(self, joints: list, stiffness: int, duration: int = 1000, callback: callable = None) -> None:334        if callback:335            self.__register_listener('SetStiffnessDone', callback)336        super(BasicSICConnector, self).set_stiffness(joints, stiffness, duration)337    def play_motion(self, motion: str, callback: callable = None) -> None:338        if callback:339            self.__register_listener('PlayMotionDone', callback)340        super(BasicSICConnector, self).play_motion(motion)341    def start_record_motion(self, joint_chains: list, framerate: int = 5, callback: callable = None) -> None:342        if callback:343            self.__register_listener('RecordMotionStarted', callback)344        super(BasicSICConnector, self).start_record_motion(joint_chains, framerate)345    def stop_record_motion(self, callback: callable = None) -> None:346        if callback:347            self.__register_listener('onRobotMotionRecording', callback)348        super(BasicSICConnector, self).stop_record_motion()349    def browser_show(self, html: str, callback: callable = None) -> None:350        super(BasicSICConnector, self).browser_show(html)351    ###########################352    # Robot action Listeners  #353    ###########################354    def subscribe_condition(self, condition: Condition) -> None:355        """356        Subscribe a threading.Condition object that will be notified each time a registered callback is called.357        :param condition: Condition object that will be notified358        :return:359        """360        self.__conditions.append(condition)361    def unsubscribe_condition(self, condition: Condition) -> None:362        """363        Unsubscribe the threading.Condition object.364        :param condition: Condition object to unsubscribe365        :return:366        """367        if condition in self.__conditions:368            self.__conditions.remove(condition)369    def __notify_conditions(self) -> None:370        for condition in self.__conditions:371            with condition:372                condition.notify()373    def __register_listener(self, event: str, callback: callable) -> None:374        self.__listeners[event] = callback375    def __unregister_listener(self, event: str) -> None:376        del self.__listeners[event]377    def __register_vision_listener(self, event: str, callback: callable) -> None:378        self.__vision_listeners[event] = callback379    def __unregister_vision_listener(self, event: str) -> None:380        del self.__vision_listeners[event]381    def __notify_listeners(self, event: str, *args) -> None:382        if event in self.__listeners:383            listener = self.__listeners[event]384            listener(*args)385            self.__notify_conditions()386    def __notify_vision_listeners(self, event: str, *args) -> None:387        if event in self.__vision_listeners:388            listener = self.__vision_listeners[event]389            listener(*args)390            self.__notify_conditions()391    def __notify_touch_listeners(self, event: str, *args) -> None:392        if event in self.__touch_listeners:393            listener = self.__touch_listeners[event]394            listener(*args)395            self.__notify_conditions()...PressureInput.py
Source:PressureInput.py  
...35    strong_action_hysteresis: float = 3036    __listeners: [SipPuffListener] = []37    def __init__(self, sensor: IPressureSensor):38        self.__pressure_sensor = sensor39    def __notify_listeners(self, event: SipPuffEvent):40        if self.debug:41            print("Notifying listeners of event: " + event.__str__())42        for listener in self.__listeners:43            listener.handle_sip_puff_event(event)44    def register_listener(self, listener: SipPuffListener):45        self.__listeners.append(listener)46    def update(self):47        """48        Takes a reading from the underlying sensor and processes it.49        This should be called in a loop externally.50        :return: None, A side effect of this method might be the emission of a :class:SipPuffEvent to listeners51        """52        sensor_value = self.__pressure_sensor.get_pressure_in_Pascal()53        self.__reference_pressure_filter.update(sensor_value)54        reference_value = self.__reference_pressure_filter.get_ambient_pressure_estimation()55        pdiff = sensor_value - reference_value56        if self.debug:57            print("Reference value: " + str(reference_value))58            print("Sensor value: " + str(sensor_value))59            print("Pressure difference: " + str(pdiff))60        if self.__current_state == InputState.IDLE:61            self.__update_IDLE(pdiff)62        elif self.__current_state == InputState.MEASURING:63            self.__update_MEASURING(pdiff)64        elif self.__current_state == InputState.FINISHED_WAITING:65            self.__update_FINISHED_WAITING(pdiff)66        else:67            raise Exception("Untreated enum value for input state: " + self.__current_state.__str__())68    def get_current_duration(self) -> Optional[float]:69        if self.__action_start_time is None:70            return None71        else:72            return time.perf_counter() - self.__action_start_time73    def __get_action_avg_pressure(self) -> float:74        if len(self.__action_pressure_history) < 1:75            return 076        else:77            return sum(self.__action_pressure_history) / len(self.__action_pressure_history)78    def __update_IDLE(self, pdiff):79        # clear bookkeeping80        self.__action_start_time = None81        self.__action_pressure_history.clear()82        # Ignore no sipping or puffing83        if abs(pdiff) < self.weak_action_thresh:84            return85        if self.debug:86            print("Changing mode from to measuring for next cycle")87        # Start the timer and add the observation88        self.__action_start_time = time.perf_counter()89        self.__action_pressure_history.append(pdiff)90        # Change state to Measuring91        self.__current_state = InputState.MEASURING92    def __update_MEASURING(self, pdiff):93        # Check if we are past the duration for a long event94        if self.get_current_duration() > self.long_Action_min_time:95            self.__current_state = InputState.FINISHED_WAITING96            if self.debug:97                print("Changing mode to waiting for next cycle")98            avg_pressure = self.__get_action_avg_pressure()99            # Order matters here. Match the extreme conditions first!100            if avg_pressure < -self.strong_action_thresh:101                self.__notify_listeners(SipPuffEvent.LONG_STRONG_SIP)102            elif avg_pressure > self.strong_action_thresh:103                self.__notify_listeners(SipPuffEvent.LONG_STRONG_PUFF)104            elif avg_pressure < -self.weak_action_thresh:105                self.__notify_listeners(SipPuffEvent.LONG_WEAK_SIP)106            elif avg_pressure > self.weak_action_thresh:107                self.__notify_listeners(SipPuffEvent.LONG_WEAK_PUFF)108            else:109                pass110            return111        # If we slip below the hysteresis around 0, the action is over112        if abs(pdiff) < self.weak_action_thresh - self.weak_action_hysteresis:113            self.__current_state = InputState.IDLE114            # Have we cleared the minimum time for an action?115            if self.get_current_duration() < self.short_action_min_time:116                return117            avg_pressure = self.__get_action_avg_pressure()118            if avg_pressure < -self.strong_action_thresh:119                self.__notify_listeners(SipPuffEvent.SHORT_STRONG_SIP)120            elif avg_pressure > self.strong_action_thresh:121                self.__notify_listeners(SipPuffEvent.SHORT_STRONG_PUFF)122            elif avg_pressure < -self.weak_action_thresh:123                self.__notify_listeners(SipPuffEvent.SHORT_WEAK_SIP)124            elif avg_pressure > self.weak_action_thresh:125                self.__notify_listeners(SipPuffEvent.SHORT_WEAK_PUFF)126            else:127                pass128            return129        # Append measurement at the end if we do not stop130        # We want to ignore the measurement that's falling below the threshold131        self.__action_pressure_history.append(pdiff)132    def __update_FINISHED_WAITING(self, pdiff):133        # This state is for waiting out the time after a long input134        # without triggering a new one135        if abs(pdiff) < self.weak_action_thresh - self.weak_action_hysteresis:136            self.__current_state = InputState.IDLE137            if self.debug:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
