Best Python code snippet using Testify_python
interrupt_handler.py
Source:interrupt_handler.py  
1#!/usr/bin/env python32#3# Copyright 2014 The Chromium OS Authors. All rights reserved.4# Use of this source code is governed by a BSD-style license that can be5# found in the LICENSE file.6"""Handles Whale's button click event."""7import argparse8import functools9import logging10import os11import re12import sys13import time14from cros.factory.test.fixture.whale import keyboard_emulator15from cros.factory.test.fixture.whale import serial_client16from cros.factory.test.fixture.whale import servo_client17from cros.factory.utils import gpio_utils18from cros.factory.utils import process_utils19from cros.factory.utils import ssh_utils20from cros.factory.utils import type_utils21ActionType = type_utils.Enum(['PUSH_NEEDLE', 'FIXTURE_STARTED'])22def TimeClassMethodDebug(func):23  """A decorator to log method running time on debug level."""24  @functools.wraps(func)25  def Wrapped(*args, **kwargs):26    logging.debug('Invoking %s()', func.__name__)27    start_time = time.time()28    result = func(*args, **kwargs)29    logging.debug('%s() finished in %.4f secs', func.__name__,30                  time.time() - start_time)31    return result32  return Wrapped33class InterruptHandler:34  """Waits for Whale's I/O expanders' interrupt and dispatches it.35  It connects to BeagleBone's servod and polld, where servod is used to get36  I/O expanders' input status and reset SR latches; polld is used to wait37  GPIO 7, the interrupt pin from Whale's I/O expanders.38  """39  # Shortcuts to Whale's button and control dict.40  _BUTTON = servo_client.WHALE_BUTTON41  _CONTROL = servo_client.WHALE_CONTROL42  _FIXTURE_FEEDBACK = servo_client.FIXTURE_FEEDBACK43  _PLANKTON_FEEDBACK = servo_client.PLANKTON_FEEDBACK44  _WHALE_DEBUG_MODE_EN = servo_client.WHALE_DEBUG_MODE_EN45  # List of buttons and feedbacks to scan.46  # Difference between button and feedback is: button is latched;47  #     no latch for feedback.48  _BUTTON_LIST = servo_client.WHALE_BUTTONS49  _FEEDBACK_LIST = servo_client.WHALE_FEEDBACKS50  # Buttons that operator can use (non debug mode).51  _OPERATOR_BUTTON_LIST = (_BUTTON.FIXTURE_START, _BUTTON.FIXTURE_STOP)52  # DUT sensor check list, add (FEEDBACK, Bool) to check if MLB exists.53  # example:54  # _DUT_SENSOR_CHECK_LIST = dict([55  #  (_FIXTURE_FEEDBACK.FB8, True),56  #  (_FIXTURE_FEEDBACK.FB9, False)])57  _DUT_SENSOR_CHECK_LIST = dict()58  _INPUT_LIST = _BUTTON_LIST + _FEEDBACK_LIST59  _INPUT_INTERRUPT_GPIO = 760  # Used to avoid toggle battery too fast.61  _BATTERY_CEASE_TOGGLE_SECS = 1.062  _FixtureState = type_utils.Enum(63      ['WAIT', 'CLOSED', 'ERR_CLOSING', 'CLOSING', 'OPENING'])64  # Fixture state to LED light and LCD message (green, red, message).65  _FixtureStateParams = {66      _FixtureState.WAIT: ('on', 'on', 'ready'),67      _FixtureState.CLOSED: ('off', 'off', 'closed'),68      _FixtureState.ERR_CLOSING: ('off', 'on', '!!no board inside!!'),69      _FixtureState.CLOSING: ('off', 'on', 'closing'),70      _FixtureState.OPENING: ('off', 'on', 'opening')}71  def __init__(self, host, polld_port, servod_port, dolphin_port, rpc_debug,72               polling_wait_secs):73    """Constructor.74    Args:75      host: BeagleBone's hostname or IP address.76      polld_port: port that polld listens. Set to None if not using polld.77      servod_port: port that servod listens.78      dolphin_port: port that dolphin server listens. Set to None if not using79          dolphin server.80      rpc_debug: True to enable XMLRPC debug message.81      polling_wait_secs: # seconds for polling button clicking event.82    """83    self._poll = gpio_utils.GpioManager(84        use_polld=polld_port is not None, host=host, tcp_port=polld_port,85        verbose=rpc_debug)86    self._dolphin = None87    if dolphin_port:88      self._dolphin = serial_client.SerialClient(89          host=host, tcp_port=dolphin_port, verbose=rpc_debug)90    self._servo = servo_client.ServoClient(host=host, port=servod_port,91                                           verbose=rpc_debug)92    self._polling_wait_secs = polling_wait_secs93    # Store last feedback value. The value is initialzed in the very first94    # ScanFeedback call.95    self._last_feedback = {}96    self._starting_fixture_action = None97    # Used to avoid toggle battery too fast.98    self._last_battery_toggle_time = time.time()99  @TimeClassMethodDebug100  def Init(self):101    """Resets button latch and records feedback value."""102    self._last_feedback = self._servo.MultipleIsOn(self._FEEDBACK_LIST)103    self._servo.MultipleSet([(self._CONTROL.LCM_CMD, 'clear'),104                             (self._CONTROL.LCM_TEXT, 'Initializing...')])105    self.ResetLatch()106    self.ResetInterrupt()107    self.ResetKeyboard()108    # Initial fixture state: cover open.109    self._HandleStopFixture(show_state=False)110    self._SetState(self._FixtureState.WAIT)111  def ResetKeyboard(self):112    keyboard = keyboard_emulator.KeyboardEmulator(self._servo)113    keyboard.SimulateKeystrokes()114  def _SetState(self, state):115    green, red, message = self._FixtureStateParams[state]116    self._servo.MultipleSet([(self._CONTROL.PASS_LED, green),117                             (self._CONTROL.FAIL_LED, red),118                             (self._CONTROL.LCM_CMD, 'clear'),119                             (self._CONTROL.LCM_TEXT, message)])120    self.ShowNucIpOnLED()121  def _IsMLBInFixture(self):122    """Checks MLB(s) is(are) inside the fixture.123    If the project has only one board, check DUT_SENSOR is enough. For two124    boards project, ex. lid and base boards, check DUT_SENSOR and BASE_SENSOR.125    Returns:126      True if MLB(s) is(are) inside the fixture; otherwise False.127    """128    if not self._DUT_SENSOR_CHECK_LIST:129      logging.info('No dut sensor...')130      return True131    dut_sensor_list = list(self._DUT_SENSOR_CHECK_LIST)132    dut_sensor_status = self._servo.MultipleIsOn(dut_sensor_list)133    return dut_sensor_status == self._DUT_SENSOR_CHECK_LIST134  @TimeClassMethodDebug135  def _HandleStopFixture(self, show_state=True):136    """Stop Fixture Step"""137    logging.info('Stopping fixture...')138    if show_state:139      self._SetState(self._FixtureState.OPENING)140    # Disable battery first for safety.141    self._servo.Disable(self._CONTROL.BATTERY)142    while True:143      feedback_status = self._servo.MultipleIsOn(self._FEEDBACK_LIST)144      if (not feedback_status[self._FIXTURE_FEEDBACK.FB1] or145          not feedback_status[self._FIXTURE_FEEDBACK.FB3]):146        self._servo.Disable(self._CONTROL.FIXTURE_PUSH_NEEDLE)147        continue148      self._starting_fixture_action = None149      logging.info('[Fixture stopped]')150      break151    self._SetState(self._FixtureState.WAIT)152  @TimeClassMethodDebug153  def _HandleStartFixtureFeedbackChange(self, feedback_status):154    """Processing Start Fixture feedback information"""155    if (self._starting_fixture_action is not None and156        self._starting_fixture_action != ActionType.FIXTURE_STARTED):157      # we are closing the fixture, check if we detect a hand158      if feedback_status[self._FIXTURE_FEEDBACK.FB5]:159        # detect hand, abort160        self._HandleStopFixture()161        return162    if self._servo.IsOn(self._BUTTON.FIXTURE_START):163      if (self._starting_fixture_action == ActionType.PUSH_NEEDLE and164          feedback_status[self._FIXTURE_FEEDBACK.FB2] and165          feedback_status[self._FIXTURE_FEEDBACK.FB4]):166        logging.info('[HandleStartFixture] fixture closed')167        self._starting_fixture_action = ActionType.FIXTURE_STARTED168        self._SetState(self._FixtureState.CLOSED)169  @TimeClassMethodDebug170  def _HandleStartFixture(self):171    """Start Fixture Step"""172    logging.info('[Fixture Start ...]')173    if self._starting_fixture_action == ActionType.FIXTURE_STARTED:174      logging.info('[HandleStartFixture] ACTION = FIXTURE_STARTED')175      return176    if self._last_feedback[self._FIXTURE_FEEDBACK.FB5]:177      logging.info('[HandleStartFixture] Detect Hands, stop..')178      return179    if self._starting_fixture_action is None:180      if not self._IsMLBInFixture():181        logging.info(182            '[HandleStartFixture] OOPS! Cannot close cover without MLBs')183        self._SetState(self._FixtureState.ERR_CLOSING)184        return185      self._ResetWhaleDeviceBeforeClosing()186      self._ResetDolphinDeviceBeforeClosing()187      self._starting_fixture_action = ActionType.PUSH_NEEDLE188      self._SetState(self._FixtureState.CLOSING)189    if self._starting_fixture_action == ActionType.PUSH_NEEDLE:190      logging.info('[HandleStartFixture] pushing needle')191      self._servo.Enable(self._CONTROL.FIXTURE_PUSH_NEEDLE)192  @TimeClassMethodDebug193  def _ResetWhaleDeviceBeforeClosing(self):194    """Resets devices on Whale if necessary before closing fixture."""195    # Release DUT CC2 pull-high196    self._servo.Disable(self._CONTROL.DC)197    self._servo.Disable(self._CONTROL.OUTPUT_RESERVE_1)198  @TimeClassMethodDebug199  def _ResetDolphinDeviceBeforeClosing(self):200    """Resets Dolphin if necessary before closing fixture."""201    if self._dolphin is None:202      return203    # Set dolphin to discharging mode, if dolphin is charging, DUT will fail to204    # boot up after battery connection.205    # Assuming all serial connections are connected to Dolphin.206    serial_amount = self._dolphin.GetSerialAmount()207    for serial_index in range(serial_amount):208      self._dolphin.Send(serial_index, 'usbc_action dev')209  @TimeClassMethodDebug210  def _ToggleBattery(self):211    """Toggles battery status.212    If battery is on, switches it to off and vise versa.213    """214    if (time.time() - self._last_battery_toggle_time <215        self._BATTERY_CEASE_TOGGLE_SECS):216      logging.debug('Toggle too fast, cease toggle for %f second.',217                    self._BATTERY_CEASE_TOGGLE_SECS)218      return219    new_battery_status = ('off' if self._servo.IsOn(self._CONTROL.BATTERY)220                          else 'on')221    logging.info('[Toggle battery to %s]', new_battery_status)222    self._servo.Set(self._CONTROL.BATTERY, new_battery_status)223    self._last_battery_toggle_time = time.time()224  @TimeClassMethodDebug225  def ScanButton(self):226    """Scans all buttons and invokes button click handler for clicked buttons.227    Returns:228      True if a button is clicked.229    """230    logging.debug('[Scanning button....]')231    status = self._servo.MultipleIsOn(self._BUTTON_LIST)232    if status[self._BUTTON.FIXTURE_STOP]:233      logging.info('Calling _HandleStopFixture because FIXTURE_STOP is True.')234      self._HandleStopFixture()235      # Disable stop button, and use 'i2cset' to set it back to input mode.236      self._servo.Disable(self._BUTTON.FIXTURE_STOP)237      process_utils.Spawn(['i2cset', '-y', '1', '0x77', '0x07', '0xff'])238      return True239    if (self._starting_fixture_action != ActionType.FIXTURE_STARTED and240        self._starting_fixture_action is not None and241        not status[self._BUTTON.FIXTURE_START]):242      logging.info('Calling _HandleStopFixture because FIXTURE_START is False.')243      self._HandleStopFixture()244      return False245    button_clicked = any(status.values())246    if not button_clicked:247      return False248    operator_mode = not self._servo.IsOn(self._WHALE_DEBUG_MODE_EN)249    for button, clicked in status.items():250      if not clicked:251        continue252      if operator_mode and button not in self._OPERATOR_BUTTON_LIST:253        logging.debug('Supress button %s click because debug mode is off.',254                      button)255        continue256      if button == self._BUTTON.FIXTURE_START:257        if self._starting_fixture_action == ActionType.FIXTURE_STARTED:258          logging.info('[START] ACTION = FIXTURE_STARTED')259        else:260          self._HandleStartFixture()261      elif button == self._BUTTON.RESERVE_1:262        self._ToggleBattery()263      logging.info('Button %s clicked', button)264    return button_clicked265  @TimeClassMethodDebug266  def ScanFeedback(self):267    """Scans all feedback and invokes handler for those changed feedback.268    Returns:269      True if any feedback value is clicked.270    """271    logging.debug('[Scanning feedback....]')272    feedback_status = self._servo.MultipleIsOn(self._FEEDBACK_LIST)273    feedback_changed = False274    for name, value in feedback_status.items():275      if self._last_feedback[name] == value:276        continue277      self._HandleStartFixtureFeedbackChange(feedback_status)278      logging.info('Feedback %s value changed to %r', name, value)279      self._last_feedback[name] = value280      feedback_changed = True281    return feedback_changed282  @TimeClassMethodDebug283  def ResetLatch(self):284    """Resets SR latch for buttons."""285    self._servo.Click(self._CONTROL.INPUT_RESET)286  @TimeClassMethodDebug287  def WaitForInterrupt(self):288    logging.debug('Polling interrupt (GPIO %d %s) for %r seconds',289                  self._INPUT_INTERRUPT_GPIO, self._poll.GPIO_EDGE_FALLING,290                  self._polling_wait_secs)291    if self._poll.Poll(self._INPUT_INTERRUPT_GPIO,292                       self._poll.GPIO_EDGE_FALLING,293                       self._polling_wait_secs):294      logging.debug('Interrupt polled')295    else:296      logging.debug('Polling interrupt timeout')297  @TimeClassMethodDebug298  def ResetInterrupt(self):299    """Resets I/O expanders' interrupt.300    We have four I/O expanders (TCA9539), three of them have inputs. As301    BeagleBone can only accept one interrupt, we cascade two expanders'302    (0x75, 0x77) INT to 0x76 input pins. So any input changes from 0x75, 0x76,303    0x77 will generate interrupt to BeagleBone.304    According to TCA9539 manual:305        "resetting the interrupt circuit is achieved when data on the port is306         changed to the original setting or data is read from the port that307         generated the interrupt. ... Because each 8-bit port is read308         independently, the interrupt caused by port 0 is not cleared by a read309         of port 1, or vice versa",310    to reset interrupt, we need to read each changing bit. However, as servod311    reads a byte each time we read an input pin, so we only need to read 0x77312    byte-0, byte-1, 0x75 byte-1, and 0x76 byte-0, byte-1, in sequence to reset313    INT. The reason to read in sequence is that we need to read 0x76 at last314    as 0x77 and 0x75 INT reset could change P02 and P03 pin in 0x76.315    """316    # Touch I/O expander 0x77 byte 0 & 1, 0x75 byte 1, 0x76 byte 0 & 1.317    # Note that we skip I/O expander 0x75 byte-0 as it contains no input318    # pin, won't trigger interrupt.319    self._servo.MultipleGet([320        self._FIXTURE_FEEDBACK.FB1, self._BUTTON.FIXTURE_START,321        self._PLANKTON_FEEDBACK.FB1, self._WHALE_DEBUG_MODE_EN,322        self._BUTTON.RESERVE_1])323  def Run(self):324    """Waits for Whale's button click interrupt and dispatches it."""325    while True:326      button_clicked = self.ScanButton()327      feedback_changed = self.ScanFeedback()328      # The reason why we don't poll interrupt right after reset latch is that329      # it might be possible that a button is clicked after latch is cleared330      # but before I/O expander is touched. In this case, the button is latched331      # but the interrupt is consumed (after touching I/O expander) so that the332      # following click of that button won't trigger interrupt again, and333      # polling is blocked.334      #335      # The solution is to read button again without waiting for interrupt.336      if button_clicked or feedback_changed:337        if button_clicked:338          self.ResetLatch()339        self.ResetInterrupt()340        continue341      self.WaitForInterrupt()342  def ShowNucIpOnLED(self):343    """Shows NUC dongle IP on LED second line"""344    nuc_host = '192.168.234.1'345    testing_rsa_path = '/usr/local/factory/misc/sshkeys/testing_rsa'346    get_dongle_eth_script = (347        'timeout 1s /usr/local/factory/py/test/fixture/get_dongle_eth.sh')348    # Make identity file less open to make ssh happy349    os.chmod(testing_rsa_path, 0o600)350    ssh_command_base = ssh_utils.BuildSSHCommand(351        identity_file=testing_rsa_path)352    try:353      interface = process_utils.SpawnOutput(354          ssh_command_base + [nuc_host, get_dongle_eth_script]).strip()355    except BaseException:356      interface = None357    if not interface:358      ip_address = 'dongle not found...'359    else:360      ifconfig_command = 'ifconfig %s' % interface361      ifconfig_result = process_utils.SpawnOutput(362          ssh_command_base + [nuc_host, ifconfig_command]).strip()363      ip_matcher = re.search(r'inet (\d+\.\d+\.\d+\.\d+)', ifconfig_result,364                             re.MULTILINE)365      if not ip_matcher:366        ip_address = 'dongle not found...'367      else:368        ip_address = ip_matcher.group(1)369    self._servo.MultipleSet([(self._CONTROL.LCM_ROW, 'r1'),370                             (self._CONTROL.LCM_TEXT, ip_address)])371def ParseArgs():372  """Parses command line arguments.373  Returns:374    args from argparse.parse_args().375  """376  description = (377      'Handle Whale button click event.'378  )379  parser = argparse.ArgumentParser(380      formatter_class=argparse.RawTextHelpFormatter, description=description)381  parser.add_argument('-d', '--debug', action='store_true', default=False,382                      help='enable debug messages')383  parser.add_argument('--rpc_debug', action='store_true', default=False,384                      help='enable debug messages for XMLRPC call')385  parser.add_argument('--nouse_dolphin', action='store_false', default=True,386                      dest='use_dolphin', help='whether to skip dolphin control'387                      ' (remote server). default: %(default)s')388  parser.add_argument('--use_polld', action='store_true', default=False,389                      help='whether to use polld (for polling GPIO port on '390                      'remote server) or poll local GPIO port, default: '391                      '%(default)s')392  parser.add_argument('--host', default='127.0.0.1', type=str,393                      help='hostname of server, default: %(default)s')394  parser.add_argument('--dolphin_port', default=9997, type=int,395                      help='port that dolphin_server listens, default: '396                      '%(default)d')397  parser.add_argument('--polld_port', default=9998, type=int,398                      help='port that polld listens, default: %(default)d')399  parser.add_argument('--servod_port', default=9999, type=int,400                      help='port that servod listens, default: %(default)d')401  parser.add_argument('--polling_wait_secs', default=5, type=int,402                      help=('# seconds for polling button clicking event, '403                            'default: %(default)d'))404  return parser.parse_args()405def main():406  args = ParseArgs()407  logging.basicConfig(408      level=logging.DEBUG if args.debug else logging.INFO,409      format='%(asctime)s - %(levelname)s - %(message)s')410  polld_port = args.polld_port if args.use_polld else None411  dolphin_port = args.dolphin_port if args.use_dolphin else None412  handler = InterruptHandler(args.host, polld_port, args.servod_port,413                             dolphin_port, args.rpc_debug,414                             args.polling_wait_secs)415  handler.Init()416  handler.Run()417if __name__ == '__main__':418  try:419    main()420  except KeyboardInterrupt:421    sys.exit(0)422  except gpio_utils.GpioManagerError as e:423    sys.stderr.write(str(e) + '\n')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
