Best Python code snippet using uiautomator
test_flow.py
Source:test_flow.py  
1# -*- coding: utf-8 -*-2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.3# Use of this source code is governed by a BSD-style license that can be4# found in the LICENSE file.5"""Guide the user to perform gestures. Record and validate the gestures."""6import fcntl7import glob8import os9import subprocess10import sys11import common_util12import firmware_log13import firmware_utils14import fuzzy15import mini_color16import mtb17import touchbotII_robot_wrapper as robot_wrapper18import test_conf as conf19import validators20from firmware_utils import GestureList21sys.path.append('../../bin/input')22import input_device23# Include some constants24from firmware_constants import DEV, GV, MODE, OPTIONS, TFK25class TestFlow:26    """Guide the user to perform gestures. Record and validate the gestures."""27    def __init__(self, device_geometry, device, keyboard, win, parser, output,28                 test_version, board, firmware_version, options):29        self.device_geometry = device_geometry30        self.device = device31        self.device_node = self.device.device_node32        self.keyboard = keyboard33        self.firmware_version = firmware_version34        self.output = output35        self.board = board36        self.test_version = test_version37        self.output.print_report('%s' % test_version)38        self._get_record_cmd()39        self.win = win40        self.parser = parser41        self.packets = None42        self.gesture_file_name = None43        self.prefix_space = self.output.get_prefix_space()44        self.scores = []45        self.mode = options[OPTIONS.MODE]46        self.fngenerator_only = options[OPTIONS.FNGENERATOR]47        self.iterations = options[OPTIONS.ITERATIONS]48        self.replay_dir = options[OPTIONS.REPLAY]49        self.resume_dir = options[OPTIONS.RESUME]50        self.recording = not any([bool(self.replay_dir), bool(self.resume_dir)])51        self.device_type = (DEV.TOUCHSCREEN if options[OPTIONS.TOUCHSCREEN]52                                            else DEV.TOUCHPAD)53        self.robot = robot_wrapper.RobotWrapper(self.board, options)54        self.robot_waiting = False55        self.gv_count = float('infinity')56        gesture_names = self._get_gesture_names()57        order = None58        if self._is_robot_mode():59            order = lambda x: conf.finger_tips_required[x.name]60        self.gesture_list = GestureList(gesture_names).get_gesture_list(order)61        self._get_all_gesture_variations(options[OPTIONS.SIMPLIFIED])62        self.init_flag = False63        self.system_device = self._non_blocking_open(self.device_node)64        self.evdev_device = input_device.InputEvent()65        self.screen_shot = firmware_utils.ScreenShot(self.geometry_str)66        self.mtb_evemu = mtb.MtbEvemu(device)67        self._rename_old_log_and_html_files()68        self._set_static_prompt_messages()69        self.gesture_image_name = None70        self.gesture_continues_flag = False71        self.use_existent_event_file_flag = False72    def __del__(self):73        self.system_device.close()74    def _rename_old_log_and_html_files(self):75        """When in replay or resume mode, rename the old log and html files."""76        if self.replay_dir or self.resume_dir:77            for file_type in ['*.log', '*.html']:78                path_names = os.path.join(self.output.log_dir, file_type)79                for old_path_name in glob.glob(path_names):80                    new_path_name = '.'.join([old_path_name, 'old'])81                    os.rename(old_path_name, new_path_name)82    def _is_robot_mode(self):83        return self.robot.is_robot_action_mode() or self.mode == MODE.ROBOT_SIM84    def _get_gesture_names(self):85        """Determine the gesture names based on the mode."""86        if self.mode == MODE.QUICKSTEP:87            return conf.gesture_names_quickstep88        elif self.mode == MODE.NOISE:89            return conf.gesture_names_noise_extended90        elif self._is_robot_mode():91            # The mode could be MODE.ROBOT or MODE.ROBOT_SIM.92            # The same gesture names list is used in both modes.93            return conf.gesture_names_robot[self.device_type]94        elif self.mode == MODE.MANUAL:95            # Define the manual list which is gesture_names_complete:96            # gesture_names_robot - gesture_names_equipment_required97            manual_set = (set(conf.gesture_names_complete[self.device_type]) -98                          set(conf.gesture_names_robot[self.device_type]))99            return list(manual_set - set(conf.gesture_names_fngenerator_required))100        elif self.mode == MODE.CALIBRATION:101            return conf.gesture_names_calibration102        else:103            # Filter out tests that need a function generator for COMPLETE mode104            # unless they've indicated that they have one105            return [n for n in conf.gesture_names_complete[self.device_type]106                    if (self.fngenerator_only or107                        n not in conf.gesture_names_fngenerator_required)]108    def _non_blocking_open(self, filename):109        """Open the file in non-blocing mode."""110        fd = open(filename)111        fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK)112        return fd113    def _non_blocking_read(self, dev, fd):114        """Non-blocking read on fd."""115        try:116            dev.read(fd)117            event = (dev.tv_sec, dev.tv_usec, dev.type, dev.code, dev.value)118        except Exception, e:119            event = None120        return event121    def _reopen_system_device(self):122        """Close the device and open a new one."""123        self.system_device.close()124        self.system_device = open(self.device_node)125        self.system_device = self._non_blocking_open(self.device_node)126    def _set_static_prompt_messages(self):127        """Set static prompt messages."""128        # Prompt for next gesture.129        self._prompt_next = (130                "Press SPACE to save this file and go to next test,\n"131                "      'm'   to save this file and record again,\n"132                "      'd'   to delete this file and try again,\n"133                "      'x'   to discard this file and exit.")134        # Prompt to see test result through timeout callback.135        self._prompt_result = (136                "Perform the gesture now.\n"137                "See the test result on the right after finger lifted.\n"138                "Or press 'x' to exit.")139    def _get_prompt_abnormal_gestures(self, warn_msg):140        """Prompt for next gesture."""141        prompt = '\n'.join(142                ["It is very likely that you perform a WRONG gesture!",143                 warn_msg,144                 "Press 'd'   to delete this file and try again (recommended),",145                 "      SPACE to save this file if you are sure it's correct,",146                 "      'x'   to discard this file and exit."])147        return prompt148    def _get_prompt_no_data(self):149        """Prompt to remind user of performing gestures."""150        prompt = ("You need to perform the specified gestures "151                  "before pressing SPACE.\n")152        return prompt + self._prompt_result153    def _get_record_cmd(self):154        """Get the device event record command."""155        # Run mtplot with settings to disable clearing the display if the robot156        # clicks the pad, and adding a visible click indicator in the output157        self.record_program = 'mtplot -s1 -c0 -m0'158        if not common_util.program_exists(self.record_program):159            msg = 'Error: the program "%s" does not exist in $PATH.'160            self.output.print_report(msg % self.record_program)161            exit(1)162        display_name = firmware_utils.get_display_name()163        self.geometry_str = '%dx%d+%d+%d' % self.device_geometry164        format_str = '%s %s -d %s -g %s'165        self.record_cmd = format_str % (self.record_program,166                                        self.device_node,167                                        display_name,168                                        self.geometry_str)169        self.output.print_report('Record program: %s' % self.record_cmd)170    def _span_seq(self, seq1, seq2):171        """Span sequence seq1 over sequence seq2.172        E.g., seq1 = (('a', 'b'), 'c')173              seq2 = ('1', ('2', '3'))174              res = (('a', 'b', '1'), ('a', 'b', '2', '3'),175                     ('c', '1'), ('c', '2', '3'))176        E.g., seq1 = ('a', 'b')177              seq2 = ('1', '2', '3')178              res  = (('a', '1'), ('a', '2'), ('a', '3'),179                      ('b', '1'), ('b', '2'), ('b', '3'))180        E.g., seq1 = (('a', 'b'), ('c', 'd'))181              seq2 = ('1', '2', '3')182              res  = (('a', 'b', '1'), ('a', 'b', '2'), ('a', 'b', '3'),183                      ('c', 'd', '1'), ('c', 'd', '2'), ('c', 'd', '3'))184        """185        to_list = lambda s: list(s) if isinstance(s, tuple) else [s]186        return tuple(tuple(to_list(s1) + to_list(s2)) for s1 in seq1187                                                      for s2 in seq2)188    def span_variations(self, seq):189        """Span the variations of a gesture."""190        if seq is None:191            return (None,)192        elif isinstance(seq[0], tuple):193            return reduce(self._span_seq, seq)194        else:195            return seq196    def _stop(self):197        """Terminate the recording process."""198        self.record_proc.poll()199        # Terminate the process only when it was not terminated yet.200        if self.record_proc.returncode is None:201            self.record_proc.terminate()202            self.record_proc.wait()203        self.output.print_window('')204    def _get_gesture_image_name(self):205        """Get the gesture file base name without file extension."""206        filepath = os.path.splitext(self.gesture_file_name)[0]207        self.gesture_image_name = filepath + '.png'208        return filepath209    def _close_gesture_file(self):210        """Close the gesture file."""211        if self.gesture_file.closed:212            return213        filename = self.gesture_file.name214        self.gesture_file.close()215        # Strip off the header of the gesture file.216        #217        # Input driver version is 1.0.1218        # Input device ID: bus 0x18 vendor 0x0 product 0x0 version 0x0219        # Input device name: "Atmel maXTouch Touchpad"220        # ...221        # Testing ... (interrupt to exit)222        # Event: time 519.855, type 3 (EV_ABS), code 57 (ABS_MT_TRACKING_ID),223        #                                       value 884224        #225        tmp_filename = filename + '.tmp'226        os.rename(filename, tmp_filename)227        with open(tmp_filename) as src_f:228            with open(filename, 'w') as dst_f:229                for line in src_f:230                    if line.startswith('Event:'):231                        dst_f.write(line)232        os.remove(tmp_filename)233    def _stop_record_and_post_image(self):234        """Terminate the recording process."""235        if self.record_new_file:236            self._close_gesture_file()237            self.screen_shot.dump_root(self._get_gesture_image_name())238            self.record_proc.terminate()239            self.record_proc.wait()240        else:241            self._get_gesture_image_name()242        self.win.set_image(self.gesture_image_name)243    def _create_prompt(self, test, variation):244        """Create a color prompt."""245        prompt = test.prompt246        if isinstance(variation, tuple):247            subprompt = reduce(lambda s1, s2: s1 + s2,248                               tuple(test.subprompt[s] for s in variation))249        elif variation is None or test.subprompt is None:250            subprompt = None251        else:252            subprompt = test.subprompt[variation]253        if subprompt is None:254            color_prompt = prompt255            monochrome_prompt = prompt256        else:257            color_prompt = mini_color.color_string(prompt, '{', '}', 'green')258            color_prompt = color_prompt.format(*subprompt)259            monochrome_prompt = prompt.format(*subprompt)260        color_msg_format = mini_color.color_string('\n<%s>:\n%s%s', '<', '>',261                                                   'blue')262        color_msg = color_msg_format % (test.name, self.prefix_space,263                                        color_prompt)264        msg = '%s: %s' % (test.name, monochrome_prompt)265        glog = firmware_log.GestureLog()266        glog.name = test.name267        glog.variation = variation268        glog.prompt = monochrome_prompt269        return (msg, color_msg, glog)270    def _choice_exit(self):271        """Procedure to exit."""272        self._stop()273        if os.path.exists(self.gesture_file_name):274            os.remove(self.gesture_file_name)275            self.output.print_report(self.deleted_msg)276    def _stop_record_and_rm_file(self):277        """Stop recording process and remove the current gesture file."""278        self._stop()279        if os.path.exists(self.gesture_file_name):280            os.remove(self.gesture_file_name)281            self.output.print_report(self.deleted_msg)282    def _create_gesture_file_name(self, gesture, variation):283        """Create the gesture file name based on its variation.284        Examples of different levels of file naming:285            Primary name:286                pinch_to_zoom.zoom_in-lumpy-fw_11.27287            Root name:288                pinch_to_zoom.zoom_in-lumpy-fw_11.27-manual-20130221_050510289            Base name:290                pinch_to_zoom.zoom_in-lumpy-fw_11.27-manual-20130221_050510.dat291        """292        if variation is None:293            gesture_name = gesture.name294        else:295            if type(variation) is tuple:296                name_list = [gesture.name,] + list(variation)297            else:298                name_list = [gesture.name, variation]299            gesture_name = '.'.join(name_list)300        self.primary_name = conf.filename.sep.join([301                gesture_name,302                self.board,303                conf.fw_prefix + self.firmware_version])304        root_name = conf.filename.sep.join([305                self.primary_name,306                self.mode,307                firmware_utils.get_current_time_str()])308        basename = '.'.join([root_name, conf.filename.ext])309        return basename310    def _add_scores(self, new_scores):311        """Add the new scores of a single gesture to the scores list."""312        if new_scores is not None:313            self.scores += new_scores314    def _final_scores(self, scores):315        """Print the final score."""316        # Note: conf.score_aggregator uses a function in fuzzy module.317        final_score = eval(conf.score_aggregator)(scores)318        self.output.print_report('\nFinal score: %s\n' % str(final_score))319    def _robot_action(self):320        """Control the robot to perform the action."""321        if self._is_robot_mode() or self.robot.is_manual_noise_test_mode():322            self.robot.configure_noise(self.gesture, self.variation)323        if self._is_robot_mode():324            self.robot.control(self.gesture, self.variation)325            # Once the script terminates start a timeout to clean up if one326            # hasn't already been set to keep the test suite from hanging.327            if not self.gesture_begins_flag:328                self.win.register_timeout_add(self.gesture_timeout_callback,329                                              self.gesture.timeout)330    def _handle_user_choice_save_after_parsing(self, next_gesture=True):331        """Handle user choice for saving the parsed gesture file."""332        self.output.print_window('')333        if self.use_existent_event_file_flag or self.recording:334            if self.saved_msg:335                self.output.print_report(self.saved_msg)336            if self.new_scores:337                self._add_scores(self.new_scores)338            self.output.report_html.insert_image(self.gesture_image_name)339            self.output.report_html.flush()340        # After flushing to report_html, reset the gesture_image_name so that341        # it will not be reused by next gesture variation accidentally.342        self.gesture_image_name = None343        if self._pre_setup_this_gesture_variation(next_gesture=next_gesture):344            # There are more gestures.345            self._setup_this_gesture_variation()346            self._robot_action()347        else:348            # No more gesture.349            self._final_scores(self.scores)350            self.output.stop()351            self.output.report_html.stop()352            self.win.stop()353        self.packets = None354    def _handle_user_choice_discard_after_parsing(self):355        """Handle user choice for discarding the parsed gesture file."""356        self.output.print_window('')357        self._setup_this_gesture_variation()358        self._robot_action()359        self.packets = None360    def _handle_user_choice_exit_after_parsing(self):361        """Handle user choice to exit after the gesture file is parsed."""362        self._stop_record_and_rm_file()363        self.output.stop()364        self.output.report_html.stop()365        self.win.stop()366    def check_for_wrong_number_of_fingers(self, details):367        flag_found = False368        try:369            position = details.index('CountTrackingIDValidator')370        except ValueError as e:371            return None372        # An example of the count of tracking IDs:373        #     '    count of trackid IDs: 1'374        number_tracking_ids = int(details[position + 1].split()[-1])375        # An example of the criteria_str looks like:376        #     '    criteria_str: == 2'377        criteria = int(details[position + 2].split()[-1])378        if number_tracking_ids < criteria:379            print '  CountTrackingIDValidator: '380            print '  number_tracking_ids: ', number_tracking_ids381            print '  criteria: ', criteria382            print '  number_tracking_ids should be larger!'383            msg = 'Number of Tracking IDs should be %d instead of %d'384            return msg % (criteria, number_tracking_ids)385        return None386    def _empty_packets_is_legal_result(self):387        return ('tap' in self.gesture.name and self._is_robot_mode())388    def _handle_user_choice_validate_before_parsing(self):389        """Handle user choice for validating before gesture file is parsed."""390        # Parse the device events. Make sure there are events.391        self.packets = self.parser.parse_file(self.gesture_file_name)392        if self.packets or self._empty_packets_is_legal_result():393            # Validate this gesture and get the results.394            (self.new_scores, msg_list, vlogs) = validators.validate(395                    self.packets, self.gesture, self.variation)396            # If the number of tracking IDs is less than the expected value,397            # the user probably made a wrong gesture.398            error = self.check_for_wrong_number_of_fingers(msg_list)399            if error:400                prompt = self._get_prompt_abnormal_gestures(error)401                color = 'red'402            else:403                prompt = self._prompt_next404                color = 'black'405            self.output.print_window(msg_list)406            self.output.buffer_report(msg_list)407            self.output.report_html.insert_validator_logs(vlogs)408            self.win.set_prompt(prompt, color=color)409            print prompt410            self._stop_record_and_post_image()411        else:412            self.win.set_prompt(self._get_prompt_no_data(), color='red')413    def _handle_user_choice_exit_before_parsing(self):414        """Handle user choice to exit before the gesture file is parsed."""415        self._close_gesture_file()416        self._handle_user_choice_exit_after_parsing()417    def _is_parsing_gesture_file_done(self):418        """Is parsing the gesture file done?"""419        return self.packets is not None420    def _is_arrow_key(self, choice):421        """Is this an arrow key?"""422        return (choice in TFK.ARROW_KEY_LIST)423    def user_choice_callback(self, fd, condition):424        """A callback to handle the key pressed by the user.425        This is the primary GUI event-driven method handling the user input.426        """427        choice = self.keyboard.get_key_press_event(fd)428        if choice:429            self._handle_keyboard_event(choice)430        return True431    def _handle_keyboard_event(self, choice):432        """Handle the keyboard event."""433        if self._is_arrow_key(choice):434            self.win.scroll(choice)435        elif self.robot_waiting:436            # The user wants the robot to start its action.437            if choice in (TFK.SAVE, TFK.SAVE2):438                self.robot_waiting = False439                self._robot_action()440            # The user wants to exit.441            elif choice == TFK.EXIT:442                self._handle_user_choice_exit_after_parsing()443        elif self._is_parsing_gesture_file_done():444            # Save this gesture file and go to next gesture.445            if choice in (TFK.SAVE, TFK.SAVE2):446                self._handle_user_choice_save_after_parsing()447            # Save this file and perform the same gesture again.448            elif choice == TFK.MORE:449                self._handle_user_choice_save_after_parsing(next_gesture=False)450            # Discard this file and perform the gesture again.451            elif choice == TFK.DISCARD:452                self._handle_user_choice_discard_after_parsing()453            # The user wants to exit.454            elif choice == TFK.EXIT:455                self._handle_user_choice_exit_after_parsing()456            # The user presses any wrong key.457            else:458                self.win.set_prompt(self._prompt_next, color='red')459        else:460            if choice == TFK.EXIT:461                self._handle_user_choice_exit_before_parsing()462            # The user presses any wrong key.463            else:464                self.win.set_prompt(self._prompt_result, color='red')465    def _get_all_gesture_variations(self, simplified):466        """Get all variations for all gestures."""467        gesture_variations_list = []468        self.variations_dict = {}469        for gesture in self.gesture_list:470            variations_list = []471            variations = self.span_variations(gesture.variations)472            for variation in variations:473                gesture_variations_list.append((gesture, variation))474                variations_list.append(variation)475                if simplified:476                    break477            self.variations_dict[gesture.name] = variations_list478        self.gesture_variations = iter(gesture_variations_list)479    def gesture_timeout_callback(self):480        """A callback watching whether a gesture has timed out."""481        if self.replay_dir:482            # There are event files to replay for this gesture variation.483            if self.use_existent_event_file_flag:484                self._handle_user_choice_validate_before_parsing()485            self._handle_user_choice_save_after_parsing(next_gesture=True)486            return False487        # A gesture is stopped only when two conditions are met simultaneously:488        # (1) there are no reported packets for a timeout interval, and489        # (2) the number of tracking IDs is 0.490        elif (self.gesture_continues_flag or491            not self.mtb_evemu.all_fingers_leaving()):492            self.gesture_continues_flag = False493            return True494        else:495            self._handle_user_choice_validate_before_parsing()496            self.win.remove_event_source(self.gesture_file_watch_tag)497            if self._is_robot_mode():498                self._handle_keyboard_event(TFK.SAVE)499            return False500    def gesture_file_watch_callback(self, fd, condition, evdev_device):501        """A callback to watch the device input."""502        # Read the device node continuously until end503        event = True504        while event:505            event = self._non_blocking_read(evdev_device, fd)506            if event:507                self.mtb_evemu.process_event(event)508        self.gesture_continues_flag = True509        if (not self.gesture_begins_flag):510            self.gesture_begins_flag = True511            self.win.register_timeout_add(self.gesture_timeout_callback,512                                          self.gesture.timeout)513        return True514    def init_gesture_setup_callback(self, widget, event):515        """A callback to set up environment before a user starts a gesture."""516        if not self.init_flag:517            self.init_flag = True518            self._pre_setup_this_gesture_variation()519            self._setup_this_gesture_variation()520            self._robot_action()521    def _get_existent_event_files(self):522        """Get the existent event files that starts with the primary_name."""523        primary_pathnames = os.path.join(self.output.log_dir,524                                         self.primary_name + '*.dat')525        self.primary_gesture_files = glob.glob(primary_pathnames)526        # Reverse sorting the file list so that we could pop from the tail.527        self.primary_gesture_files.sort()528        self.primary_gesture_files.reverse()529    def _use_existent_event_file(self):530        """If the replay flag is set in the command line, and there exists a531        file(s) with the same primary name, then use the existent file(s)532        instead of recording a new one.533        """534        if self.primary_gesture_files:535            self.gesture_file_name = self.primary_gesture_files.pop()536            return True537        return False538    def _pre_setup_this_gesture_variation(self, next_gesture=True):539        """Get gesture, variation, filename, prompt, etc."""540        next_gesture_first_time = False541        if next_gesture:542            if self.gv_count < self.iterations:543                self.gv_count += 1544            else:545                self.gv_count = 1546                gesture_variation = next(self.gesture_variations, None)547                if gesture_variation is None:548                    return False549                self.gesture, self.variation = gesture_variation550                next_gesture_first_time = True551        basename = self._create_gesture_file_name(self.gesture, self.variation)552        if next_gesture_first_time:553            self._get_existent_event_files()554        if self.replay_dir or self.resume_dir:555            self.use_existent_event_file_flag = self._use_existent_event_file()556        if ((not self.replay_dir and not self.resume_dir) or557                (self.resume_dir and not self.use_existent_event_file_flag)):558            self.gesture_file_name = os.path.join(self.output.log_dir, basename)559            self.saved_msg = '(saved: %s)\n' % self.gesture_file_name560            self.deleted_msg = '(deleted: %s)\n' % self.gesture_file_name561        else:562            self.saved_msg = None563            self.deleted_msg = None564        self.new_scores = None565        if self.robot.is_robot_action_mode() or self.robot.is_manual_noise_test_mode():566            self.robot.turn_off_noise()567        (msg, color_msg, glog) = self._create_prompt(self.gesture,568                                                     self.variation)569        self.win.set_gesture_name(msg)570        self.output.report_html.insert_gesture_log(glog)571        print color_msg572        self.output.print_report(color_msg)573        return True574    def _setup_this_gesture_variation(self):575        """Set up the recording process or use an existent event data file."""576        if self.replay_dir:577            self.record_new_file = False578            self.win.register_timeout_add(self.gesture_timeout_callback, 0)579            return580        if self.resume_dir and self.use_existent_event_file_flag:581            self.record_new_file = False582            self._handle_user_choice_validate_before_parsing()583            self._handle_keyboard_event(TFK.SAVE)584            return585        # Initiate the MtbSanityValidator. Note that this should be done each586        # time just before recording the gesture file since it requires a587        # snapshot of the input device before any finger touching the device.588        self.gesture.mtb_sanity_validator = validators.MtbSanityValidator()589        # Now, we will record a new gesture event file.590        # Fork a new process for mtplot. Add io watch for the gesture file.591        self.record_new_file = True592        self.gesture_file = open(self.gesture_file_name, 'w')593        self.record_proc = subprocess.Popen(self.record_cmd.split(),594                                            stdout=self.gesture_file)595        # Watch if data come in to the monitored file.596        self.gesture_begins_flag = False597        self._reopen_system_device()598        self.gesture_file_watch_tag = self.win.register_io_add_watch(599                self.gesture_file_watch_callback, self.system_device,...gesture.py
Source:gesture.py  
...9    g.add_stroke(point_list=[(1,1), (3,4), (2,1)])10    g.normalize()11    # Add it to the database12    gdb = GestureDatabase()13    gdb.add_gesture(g)14    # And for the next gesture, try to find it!15    g2 = Gesture()16    # ...17    gdb.find(g2)18.. warning::19   You don't really want to do this: it's more of an example of how20   to construct gestures dynamically. Typically, you would21   need a lot more points, so it's better to record gestures in a file and22   reload them to compare later. Look in the examples/gestures directory for23   an example of how to do that.24'''25__all__ = ('Gesture', 'GestureDatabase', 'GesturePoint', 'GestureStroke')26import pickle27import base6428import zlib29import math30from kivy.vector import Vector31from io import BytesIO32class GestureDatabase(object):33    '''Class to handle a gesture database.'''34    def __init__(self):35        self.db = []36    def add_gesture(self, gesture):37        '''Add a new gesture to the database.'''38        self.db.append(gesture)39    def find(self, gesture, minscore=0.9, rotation_invariant=True):40        '''Find a matching gesture in the database.'''41        if not gesture:42            return43        best = None44        bestscore = minscore45        for g in self.db:46            score = g.get_score(gesture, rotation_invariant)47            if score < bestscore:48                continue49            bestscore = score50            best = g51        if not best:52            return53        return (bestscore, best)54    def gesture_to_str(self, gesture):55        '''Convert a gesture into a unique string.'''56        io = BytesIO()57        p = pickle.Pickler(io)58        p.dump(gesture)59        data = base64.b64encode(zlib.compress(io.getvalue(), 9))60        return data61    def str_to_gesture(self, data):62        '''Convert a unique string to a gesture.'''63        io = BytesIO(zlib.decompress(base64.b64decode(data)))64        p = pickle.Unpickler(io)65        gesture = p.load()66        return gesture67class GesturePoint:68    def __init__(self, x, y):69        '''Stores the x,y coordinates of a point in the gesture.'''70        self.x = float(x)71        self.y = float(y)72    def scale(self, factor):73        ''' Scales the point by the given factor.'''74        self.x *= factor75        self.y *= factor76        return self77    def __repr__(self):78        return 'Mouse_point: %f,%f' % (self.x, self.y)79class GestureStroke:80    ''' Gestures can be made up of multiple strokes.'''81    def __init__(self):82        ''' A stroke in the gesture.'''83        self.points = list()84        self.screenpoints = list()85    # These return the min and max coordinates of the stroke86    @property87    def max_x(self):88        if len(self.points) == 0:89            return 090        return max(self.points, key=lambda pt: pt.x).x91    @property92    def min_x(self):93        if len(self.points) == 0:94            return 095        return min(self.points, key=lambda pt: pt.x).x96    @property97    def max_y(self):98        if len(self.points) == 0:99            return 0100        return max(self.points, key=lambda pt: pt.y).y101    @property102    def min_y(self):103        if len(self.points) == 0:104            return 0105        return min(self.points, key=lambda pt: pt.y).y106    def add_point(self, x, y):107        '''108        add_point(x=x_pos, y=y_pos)109        Adds a point to the stroke.110        '''111        self.points.append(GesturePoint(x, y))112        self.screenpoints.append((x, y))113    def scale_stroke(self, scale_factor):114        '''115        scale_stroke(scale_factor=float)116        Scales the stroke down by scale_factor.117        '''118        self.points = [pt.scale(scale_factor) for pt in self.points]119    def points_distance(self, point1, point2):120        '''121        points_distance(point1=GesturePoint, point2=GesturePoint)122        Returns the distance between two GesturePoints.123        '''124        x = point1.x - point2.x125        y = point1.y - point2.y126        return math.sqrt(x * x + y * y)127    def stroke_length(self, point_list=None):128        '''Finds the length of the stroke. If a point list is given,129           finds the length of that list.130        '''131        if point_list is None:132            point_list = self.points133        gesture_length = 0.0134        if len(point_list) <= 1:  # If there is only one point -> no length135            return gesture_length136        for i in range(len(point_list) - 1):137            gesture_length += self.points_distance(138                point_list[i], point_list[i + 1])139        return gesture_length140    def normalize_stroke(self, sample_points=32):141        '''Normalizes strokes so that every stroke has a standard number of142           points. Returns True if stroke is normalized, False if it can't be143           normalized. sample_points controls the resolution of the stroke.144        '''145        # If there is only one point or the length is 0, don't normalize146        if len(self.points) <= 1 or self.stroke_length(self.points) == 0.0:147            return False148        # Calculate how long each point should be in the stroke149        target_stroke_size = \150            self.stroke_length(self.points) / float(sample_points)151        new_points = list()152        new_points.append(self.points[0])153        # We loop on the points154        prev = self.points[0]155        src_distance = 0.0156        dst_distance = target_stroke_size157        for curr in self.points[1:]:158            d = self.points_distance(prev, curr)159            if d > 0:160                prev = curr161                src_distance = src_distance + d162                # The new point need to be inserted into the163                # segment [prev, curr]164                while dst_distance < src_distance:165                    x_dir = curr.x - prev.x166                    y_dir = curr.y - prev.y167                    ratio = (src_distance - dst_distance) / d168                    to_x = x_dir * ratio + prev.x169                    to_y = y_dir * ratio + prev.y170                    new_points.append(GesturePoint(to_x, to_y))171                    dst_distance = self.stroke_length(self.points) / \172                        float(sample_points) * len(new_points)173        # If this happens, we are into troubles...174        if not len(new_points) == sample_points:175            raise ValueError('Invalid number of strokes points; got '176                             '%d while it should be %d' %177                             (len(new_points), sample_points))178        self.points = new_points179        return True180    def center_stroke(self, offset_x, offset_y):181        '''Centers the stroke by offsetting the points.'''182        for point in self.points:183            point.x -= offset_x184            point.y -= offset_y185class Gesture:186    '''A python implementation of a gesture recognition algorithm by187    Oleg Dopertchouk: http://www.gamedev.net/reference/articles/article2039.asp188    Implemented by Jeiel Aranal (chemikhazi@gmail.com),189    released into the public domain.190    '''191    # Tolerance for evaluation using the '==' operator192    DEFAULT_TOLERANCE = 0.1193    def __init__(self, tolerance=None):194        '''195        Gesture([tolerance=float])196        Creates a new gesture with an optional matching tolerance value.197        '''198        self.width = 0.199        self.height = 0.200        self.gesture_product = 0.201        self.strokes = list()202        if tolerance is None:203            self.tolerance = Gesture.DEFAULT_TOLERANCE204        else:205            self.tolerance = tolerance206    def _scale_gesture(self):207        ''' Scales down the gesture to a unit of 1.'''208        # map() creates a list of min/max coordinates of the strokes209        # in the gesture and min()/max() pulls the lowest/highest value210        min_x = min([stroke.min_x for stroke in self.strokes])211        max_x = max([stroke.max_x for stroke in self.strokes])212        min_y = min([stroke.min_y for stroke in self.strokes])213        max_y = max([stroke.max_y for stroke in self.strokes])214        x_len = max_x - min_x215        self.width = x_len216        y_len = max_y - min_y217        self.height = y_len218        scale_factor = max(x_len, y_len)219        if scale_factor <= 0.0:220            return False221        scale_factor = 1.0 / scale_factor222        for stroke in self.strokes:223            stroke.scale_stroke(scale_factor)224        return True225    def _center_gesture(self):226        ''' Centers the Gesture.points of the gesture.'''227        total_x = 0.0228        total_y = 0.0229        total_points = 0230        for stroke in self.strokes:231            # adds up all the points inside the stroke232            stroke_y = sum([pt.y for pt in stroke.points])233            stroke_x = sum([pt.x for pt in stroke.points])234            total_y += stroke_y235            total_x += stroke_x236            total_points += len(stroke.points)237        if total_points == 0:238            return False239        # Average to get the offset240        total_x /= total_points241        total_y /= total_points242        # Apply the offset to the strokes243        for stroke in self.strokes:244            stroke.center_stroke(total_x, total_y)245        return True246    def add_stroke(self, point_list=None):247        '''Adds a stroke to the gesture and returns the Stroke instance.248           Optional point_list argument is a list of the mouse points for249           the stroke.250        '''251        self.strokes.append(GestureStroke())252        if isinstance(point_list, list) or isinstance(point_list, tuple):253            for point in point_list:254                if isinstance(point, GesturePoint):255                    self.strokes[-1].points.append(point)256                elif isinstance(point, list) or isinstance(point, tuple):257                    if len(point) != 2:258                        raise ValueError("Stroke entry must have 2 values max")259                    self.strokes[-1].add_point(point[0], point[1])260                else:261                    raise TypeError("The point list should either be "262                                    "tuples of x and y or a list of "263                                    "GesturePoint objects")264        elif point_list is not None:265            raise ValueError("point_list should be a tuple/list")266        return self.strokes[-1]267    def normalize(self, stroke_samples=32):268        '''Runs the gesture normalization algorithm and calculates the dot269        product with self.270        '''271        if not self._scale_gesture() or not self._center_gesture():272            self.gesture_product = False273            return False274        for stroke in self.strokes:275            stroke.normalize_stroke(stroke_samples)276        self.gesture_product = self.dot_product(self)277    def get_rigid_rotation(self, dstpts):278        '''279        Extract the rotation to apply to a group of points to minimize the280        distance to a second group of points. The two groups of points are281        assumed to be centered. This is a simple version that just picks282        an angle based on the first point of the gesture.283        '''284        if len(self.strokes) < 1 or len(self.strokes[0].points) < 1:285            return 0...gestures.py
Source:gestures.py  
1"""Gesture detection for SNAP Badge, using the LIS3DH accelerometer.2Note: this could be enhanced to use the LIS3D hardware for detection, with the advantage of much lower3      cpu utilization and better performance including wake on interrupt. The initial software based4      implementation has the advantage of being easy to understand and experiment with.5"""6from drivers.lis3dh_accel import *7# Detected gestures8GESTURE_DOWN = 0     # Badge held vertical, then flipped down face-up9GESTURE_ZERO_G = 1   # Badge experienced zero-g condition10GESTURE_HIGH_G = 2   # Badge experienced high-g condition11# Double tap configuration parameters  12DT_CFG1 = 0x77      # ODR = 400Hz13DT_CFG2 = 0x84      # Enable High-pass filter14DT_CFG3 = 0xc0      # Set INT1 pin15DT_SRC = 0x2016DT_THRESHOLD = 0x5017DT_CFG = 0x2018DT_LIMIT = 0x17     # 57.5ms19DT_LATENCY = 0x25   # 92.5ms20DT_WINDOW = 0x30    # 120ms21gesture_debounce = 2022gesture_cb = None23def gesture_set_callback(cb):24    """Set callback when gesture detected. Callback receives detected gesture type as parameter.25         ex: def my_gestures(gesture_type): ...26    """27    global gesture_cb28    gesture_cb = cb29def gesture_update_accel():30    """Track accelerometer position readings from last poll. Enables determining inertial 'diffs' of each axis"""31    global gest_last_x, gest_last_y, gest_last_z32    gest_last_x = lis_axis_x33    gest_last_y = lis_axis_y34    gest_last_z = lis_axis_z35def gesture_poll_10ms():36    """Called by system every 10ms"""37    global gesture_debounce38    # Poll the accelerometer39    lis_read()40    41    # Debounce gesture detection42    if gesture_debounce:43        gesture_debounce -= 144    else:45        #print lis_axis_x, ",", lis_axis_y, ",", lis_axis_z46        #print "dxyz=", lis_axis_x - rps_last_x, ",",lis_axis_y - rps_last_y, ",",lis_axis_z - rps_last_z47        48        # Detect GESTURE_DOWN event49        dz = lis_axis_z - gest_last_z50        if dz > 6000:51            if gesture_cb:52                gesture_cb(GESTURE_DOWN)53            gesture_debounce = 2054    55    gesture_update_accel()56    57def gesture_set_zero_g():58    """Configure accelerometer to detect zero-g condition and set interrupt"""59    lis_int1(0x95, 0x10, 0x10)60    61def gesture_set_high_g():62    """Configure accelerometer to detect high-g condition and set interrupt"""63    lis_int1(0x2a, 0x30, 0x00)64    65def gesture_set_double_tap():66    """Configure accelerometer to detect double tap and set interrupt"""67    lis_ctrl(1, DT_CFG1)68    lis_ctrl(2, DT_CFG2)69    lis_ctrl(3, DT_CFG3)70    lis_tap_cfg(DT_SRC, DT_THRESHOLD, DT_CFG, DT_LIMIT, DT_LATENCY, DT_WINDOW)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
