How to use fixture_start method in Testify

Best Python code snippet using Testify_python

interrupt_handler.py

Source:interrupt_handler.py Github

copy

Full Screen

1#!/usr/bin/env python32#3# Copyright 2014 The Chromium OS Authors. All rights reserved.4# Use of this source code is governed by a BSD-style license that can be5# found in the LICENSE file.6"""Handles Whale's button click event."""7import argparse8import functools9import logging10import os11import re12import sys13import time14from cros.factory.test.fixture.whale import keyboard_emulator15from cros.factory.test.fixture.whale import serial_client16from cros.factory.test.fixture.whale import servo_client17from cros.factory.utils import gpio_utils18from cros.factory.utils import process_utils19from cros.factory.utils import ssh_utils20from cros.factory.utils import type_utils21ActionType = type_utils.Enum(['PUSH_NEEDLE', 'FIXTURE_STARTED'])22def TimeClassMethodDebug(func):23 """A decorator to log method running time on debug level."""24 @functools.wraps(func)25 def Wrapped(*args, **kwargs):26 logging.debug('Invoking %s()', func.__name__)27 start_time = time.time()28 result = func(*args, **kwargs)29 logging.debug('%s() finished in %.4f secs', func.__name__,30 time.time() - start_time)31 return result32 return Wrapped33class InterruptHandler:34 """Waits for Whale's I/O expanders' interrupt and dispatches it.35 It connects to BeagleBone's servod and polld, where servod is used to get36 I/O expanders' input status and reset SR latches; polld is used to wait37 GPIO 7, the interrupt pin from Whale's I/O expanders.38 """39 # Shortcuts to Whale's button and control dict.40 _BUTTON = servo_client.WHALE_BUTTON41 _CONTROL = servo_client.WHALE_CONTROL42 _FIXTURE_FEEDBACK = servo_client.FIXTURE_FEEDBACK43 _PLANKTON_FEEDBACK = servo_client.PLANKTON_FEEDBACK44 _WHALE_DEBUG_MODE_EN = servo_client.WHALE_DEBUG_MODE_EN45 # List of buttons and feedbacks to scan.46 # Difference between button and feedback is: button is latched;47 # no latch for feedback.48 _BUTTON_LIST = servo_client.WHALE_BUTTONS49 _FEEDBACK_LIST = servo_client.WHALE_FEEDBACKS50 # Buttons that operator can use (non debug mode).51 _OPERATOR_BUTTON_LIST = (_BUTTON.FIXTURE_START, _BUTTON.FIXTURE_STOP)52 # DUT sensor check list, add (FEEDBACK, Bool) to check if MLB exists.53 # example:54 # _DUT_SENSOR_CHECK_LIST = dict([55 # (_FIXTURE_FEEDBACK.FB8, True),56 # (_FIXTURE_FEEDBACK.FB9, False)])57 _DUT_SENSOR_CHECK_LIST = dict()58 _INPUT_LIST = _BUTTON_LIST + _FEEDBACK_LIST59 _INPUT_INTERRUPT_GPIO = 760 # Used to avoid toggle battery too fast.61 _BATTERY_CEASE_TOGGLE_SECS = 1.062 _FixtureState = type_utils.Enum(63 ['WAIT', 'CLOSED', 'ERR_CLOSING', 'CLOSING', 'OPENING'])64 # Fixture state to LED light and LCD message (green, red, message).65 _FixtureStateParams = {66 _FixtureState.WAIT: ('on', 'on', 'ready'),67 _FixtureState.CLOSED: ('off', 'off', 'closed'),68 _FixtureState.ERR_CLOSING: ('off', 'on', '!!no board inside!!'),69 _FixtureState.CLOSING: ('off', 'on', 'closing'),70 _FixtureState.OPENING: ('off', 'on', 'opening')}71 def __init__(self, host, polld_port, servod_port, dolphin_port, rpc_debug,72 polling_wait_secs):73 """Constructor.74 Args:75 host: BeagleBone's hostname or IP address.76 polld_port: port that polld listens. Set to None if not using polld.77 servod_port: port that servod listens.78 dolphin_port: port that dolphin server listens. Set to None if not using79 dolphin server.80 rpc_debug: True to enable XMLRPC debug message.81 polling_wait_secs: # seconds for polling button clicking event.82 """83 self._poll = gpio_utils.GpioManager(84 use_polld=polld_port is not None, host=host, tcp_port=polld_port,85 verbose=rpc_debug)86 self._dolphin = None87 if dolphin_port:88 self._dolphin = serial_client.SerialClient(89 host=host, tcp_port=dolphin_port, verbose=rpc_debug)90 self._servo = servo_client.ServoClient(host=host, port=servod_port,91 verbose=rpc_debug)92 self._polling_wait_secs = polling_wait_secs93 # Store last feedback value. The value is initialzed in the very first94 # ScanFeedback call.95 self._last_feedback = {}96 self._starting_fixture_action = None97 # Used to avoid toggle battery too fast.98 self._last_battery_toggle_time = time.time()99 @TimeClassMethodDebug100 def Init(self):101 """Resets button latch and records feedback value."""102 self._last_feedback = self._servo.MultipleIsOn(self._FEEDBACK_LIST)103 self._servo.MultipleSet([(self._CONTROL.LCM_CMD, 'clear'),104 (self._CONTROL.LCM_TEXT, 'Initializing...')])105 self.ResetLatch()106 self.ResetInterrupt()107 self.ResetKeyboard()108 # Initial fixture state: cover open.109 self._HandleStopFixture(show_state=False)110 self._SetState(self._FixtureState.WAIT)111 def ResetKeyboard(self):112 keyboard = keyboard_emulator.KeyboardEmulator(self._servo)113 keyboard.SimulateKeystrokes()114 def _SetState(self, state):115 green, red, message = self._FixtureStateParams[state]116 self._servo.MultipleSet([(self._CONTROL.PASS_LED, green),117 (self._CONTROL.FAIL_LED, red),118 (self._CONTROL.LCM_CMD, 'clear'),119 (self._CONTROL.LCM_TEXT, message)])120 self.ShowNucIpOnLED()121 def _IsMLBInFixture(self):122 """Checks MLB(s) is(are) inside the fixture.123 If the project has only one board, check DUT_SENSOR is enough. For two124 boards project, ex. lid and base boards, check DUT_SENSOR and BASE_SENSOR.125 Returns:126 True if MLB(s) is(are) inside the fixture; otherwise False.127 """128 if not self._DUT_SENSOR_CHECK_LIST:129 logging.info('No dut sensor...')130 return True131 dut_sensor_list = list(self._DUT_SENSOR_CHECK_LIST)132 dut_sensor_status = self._servo.MultipleIsOn(dut_sensor_list)133 return dut_sensor_status == self._DUT_SENSOR_CHECK_LIST134 @TimeClassMethodDebug135 def _HandleStopFixture(self, show_state=True):136 """Stop Fixture Step"""137 logging.info('Stopping fixture...')138 if show_state:139 self._SetState(self._FixtureState.OPENING)140 # Disable battery first for safety.141 self._servo.Disable(self._CONTROL.BATTERY)142 while True:143 feedback_status = self._servo.MultipleIsOn(self._FEEDBACK_LIST)144 if (not feedback_status[self._FIXTURE_FEEDBACK.FB1] or145 not feedback_status[self._FIXTURE_FEEDBACK.FB3]):146 self._servo.Disable(self._CONTROL.FIXTURE_PUSH_NEEDLE)147 continue148 self._starting_fixture_action = None149 logging.info('[Fixture stopped]')150 break151 self._SetState(self._FixtureState.WAIT)152 @TimeClassMethodDebug153 def _HandleStartFixtureFeedbackChange(self, feedback_status):154 """Processing Start Fixture feedback information"""155 if (self._starting_fixture_action is not None and156 self._starting_fixture_action != ActionType.FIXTURE_STARTED):157 # we are closing the fixture, check if we detect a hand158 if feedback_status[self._FIXTURE_FEEDBACK.FB5]:159 # detect hand, abort160 self._HandleStopFixture()161 return162 if self._servo.IsOn(self._BUTTON.FIXTURE_START):163 if (self._starting_fixture_action == ActionType.PUSH_NEEDLE and164 feedback_status[self._FIXTURE_FEEDBACK.FB2] and165 feedback_status[self._FIXTURE_FEEDBACK.FB4]):166 logging.info('[HandleStartFixture] fixture closed')167 self._starting_fixture_action = ActionType.FIXTURE_STARTED168 self._SetState(self._FixtureState.CLOSED)169 @TimeClassMethodDebug170 def _HandleStartFixture(self):171 """Start Fixture Step"""172 logging.info('[Fixture Start ...]')173 if self._starting_fixture_action == ActionType.FIXTURE_STARTED:174 logging.info('[HandleStartFixture] ACTION = FIXTURE_STARTED')175 return176 if self._last_feedback[self._FIXTURE_FEEDBACK.FB5]:177 logging.info('[HandleStartFixture] Detect Hands, stop..')178 return179 if self._starting_fixture_action is None:180 if not self._IsMLBInFixture():181 logging.info(182 '[HandleStartFixture] OOPS! Cannot close cover without MLBs')183 self._SetState(self._FixtureState.ERR_CLOSING)184 return185 self._ResetWhaleDeviceBeforeClosing()186 self._ResetDolphinDeviceBeforeClosing()187 self._starting_fixture_action = ActionType.PUSH_NEEDLE188 self._SetState(self._FixtureState.CLOSING)189 if self._starting_fixture_action == ActionType.PUSH_NEEDLE:190 logging.info('[HandleStartFixture] pushing needle')191 self._servo.Enable(self._CONTROL.FIXTURE_PUSH_NEEDLE)192 @TimeClassMethodDebug193 def _ResetWhaleDeviceBeforeClosing(self):194 """Resets devices on Whale if necessary before closing fixture."""195 # Release DUT CC2 pull-high196 self._servo.Disable(self._CONTROL.DC)197 self._servo.Disable(self._CONTROL.OUTPUT_RESERVE_1)198 @TimeClassMethodDebug199 def _ResetDolphinDeviceBeforeClosing(self):200 """Resets Dolphin if necessary before closing fixture."""201 if self._dolphin is None:202 return203 # Set dolphin to discharging mode, if dolphin is charging, DUT will fail to204 # boot up after battery connection.205 # Assuming all serial connections are connected to Dolphin.206 serial_amount = self._dolphin.GetSerialAmount()207 for serial_index in range(serial_amount):208 self._dolphin.Send(serial_index, 'usbc_action dev')209 @TimeClassMethodDebug210 def _ToggleBattery(self):211 """Toggles battery status.212 If battery is on, switches it to off and vise versa.213 """214 if (time.time() - self._last_battery_toggle_time <215 self._BATTERY_CEASE_TOGGLE_SECS):216 logging.debug('Toggle too fast, cease toggle for %f second.',217 self._BATTERY_CEASE_TOGGLE_SECS)218 return219 new_battery_status = ('off' if self._servo.IsOn(self._CONTROL.BATTERY)220 else 'on')221 logging.info('[Toggle battery to %s]', new_battery_status)222 self._servo.Set(self._CONTROL.BATTERY, new_battery_status)223 self._last_battery_toggle_time = time.time()224 @TimeClassMethodDebug225 def ScanButton(self):226 """Scans all buttons and invokes button click handler for clicked buttons.227 Returns:228 True if a button is clicked.229 """230 logging.debug('[Scanning button....]')231 status = self._servo.MultipleIsOn(self._BUTTON_LIST)232 if status[self._BUTTON.FIXTURE_STOP]:233 logging.info('Calling _HandleStopFixture because FIXTURE_STOP is True.')234 self._HandleStopFixture()235 # Disable stop button, and use 'i2cset' to set it back to input mode.236 self._servo.Disable(self._BUTTON.FIXTURE_STOP)237 process_utils.Spawn(['i2cset', '-y', '1', '0x77', '0x07', '0xff'])238 return True239 if (self._starting_fixture_action != ActionType.FIXTURE_STARTED and240 self._starting_fixture_action is not None and241 not status[self._BUTTON.FIXTURE_START]):242 logging.info('Calling _HandleStopFixture because FIXTURE_START is False.')243 self._HandleStopFixture()244 return False245 button_clicked = any(status.values())246 if not button_clicked:247 return False248 operator_mode = not self._servo.IsOn(self._WHALE_DEBUG_MODE_EN)249 for button, clicked in status.items():250 if not clicked:251 continue252 if operator_mode and button not in self._OPERATOR_BUTTON_LIST:253 logging.debug('Supress button %s click because debug mode is off.',254 button)255 continue256 if button == self._BUTTON.FIXTURE_START:257 if self._starting_fixture_action == ActionType.FIXTURE_STARTED:258 logging.info('[START] ACTION = FIXTURE_STARTED')259 else:260 self._HandleStartFixture()261 elif button == self._BUTTON.RESERVE_1:262 self._ToggleBattery()263 logging.info('Button %s clicked', button)264 return button_clicked265 @TimeClassMethodDebug266 def ScanFeedback(self):267 """Scans all feedback and invokes handler for those changed feedback.268 Returns:269 True if any feedback value is clicked.270 """271 logging.debug('[Scanning feedback....]')272 feedback_status = self._servo.MultipleIsOn(self._FEEDBACK_LIST)273 feedback_changed = False274 for name, value in feedback_status.items():275 if self._last_feedback[name] == value:276 continue277 self._HandleStartFixtureFeedbackChange(feedback_status)278 logging.info('Feedback %s value changed to %r', name, value)279 self._last_feedback[name] = value280 feedback_changed = True281 return feedback_changed282 @TimeClassMethodDebug283 def ResetLatch(self):284 """Resets SR latch for buttons."""285 self._servo.Click(self._CONTROL.INPUT_RESET)286 @TimeClassMethodDebug287 def WaitForInterrupt(self):288 logging.debug('Polling interrupt (GPIO %d %s) for %r seconds',289 self._INPUT_INTERRUPT_GPIO, self._poll.GPIO_EDGE_FALLING,290 self._polling_wait_secs)291 if self._poll.Poll(self._INPUT_INTERRUPT_GPIO,292 self._poll.GPIO_EDGE_FALLING,293 self._polling_wait_secs):294 logging.debug('Interrupt polled')295 else:296 logging.debug('Polling interrupt timeout')297 @TimeClassMethodDebug298 def ResetInterrupt(self):299 """Resets I/O expanders' interrupt.300 We have four I/O expanders (TCA9539), three of them have inputs. As301 BeagleBone can only accept one interrupt, we cascade two expanders'302 (0x75, 0x77) INT to 0x76 input pins. So any input changes from 0x75, 0x76,303 0x77 will generate interrupt to BeagleBone.304 According to TCA9539 manual:305 "resetting the interrupt circuit is achieved when data on the port is306 changed to the original setting or data is read from the port that307 generated the interrupt. ... Because each 8-bit port is read308 independently, the interrupt caused by port 0 is not cleared by a read309 of port 1, or vice versa",310 to reset interrupt, we need to read each changing bit. However, as servod311 reads a byte each time we read an input pin, so we only need to read 0x77312 byte-0, byte-1, 0x75 byte-1, and 0x76 byte-0, byte-1, in sequence to reset313 INT. The reason to read in sequence is that we need to read 0x76 at last314 as 0x77 and 0x75 INT reset could change P02 and P03 pin in 0x76.315 """316 # Touch I/O expander 0x77 byte 0 & 1, 0x75 byte 1, 0x76 byte 0 & 1.317 # Note that we skip I/O expander 0x75 byte-0 as it contains no input318 # pin, won't trigger interrupt.319 self._servo.MultipleGet([320 self._FIXTURE_FEEDBACK.FB1, self._BUTTON.FIXTURE_START,321 self._PLANKTON_FEEDBACK.FB1, self._WHALE_DEBUG_MODE_EN,322 self._BUTTON.RESERVE_1])323 def Run(self):324 """Waits for Whale's button click interrupt and dispatches it."""325 while True:326 button_clicked = self.ScanButton()327 feedback_changed = self.ScanFeedback()328 # The reason why we don't poll interrupt right after reset latch is that329 # it might be possible that a button is clicked after latch is cleared330 # but before I/O expander is touched. In this case, the button is latched331 # but the interrupt is consumed (after touching I/O expander) so that the332 # following click of that button won't trigger interrupt again, and333 # polling is blocked.334 #335 # The solution is to read button again without waiting for interrupt.336 if button_clicked or feedback_changed:337 if button_clicked:338 self.ResetLatch()339 self.ResetInterrupt()340 continue341 self.WaitForInterrupt()342 def ShowNucIpOnLED(self):343 """Shows NUC dongle IP on LED second line"""344 nuc_host = '192.168.234.1'345 testing_rsa_path = '/usr/local/factory/misc/sshkeys/testing_rsa'346 get_dongle_eth_script = (347 'timeout 1s /usr/local/factory/py/test/fixture/get_dongle_eth.sh')348 # Make identity file less open to make ssh happy349 os.chmod(testing_rsa_path, 0o600)350 ssh_command_base = ssh_utils.BuildSSHCommand(351 identity_file=testing_rsa_path)352 try:353 interface = process_utils.SpawnOutput(354 ssh_command_base + [nuc_host, get_dongle_eth_script]).strip()355 except BaseException:356 interface = None357 if not interface:358 ip_address = 'dongle not found...'359 else:360 ifconfig_command = 'ifconfig %s' % interface361 ifconfig_result = process_utils.SpawnOutput(362 ssh_command_base + [nuc_host, ifconfig_command]).strip()363 ip_matcher = re.search(r'inet (\d+\.\d+\.\d+\.\d+)', ifconfig_result,364 re.MULTILINE)365 if not ip_matcher:366 ip_address = 'dongle not found...'367 else:368 ip_address = ip_matcher.group(1)369 self._servo.MultipleSet([(self._CONTROL.LCM_ROW, 'r1'),370 (self._CONTROL.LCM_TEXT, ip_address)])371def ParseArgs():372 """Parses command line arguments.373 Returns:374 args from argparse.parse_args().375 """376 description = (377 'Handle Whale button click event.'378 )379 parser = argparse.ArgumentParser(380 formatter_class=argparse.RawTextHelpFormatter, description=description)381 parser.add_argument('-d', '--debug', action='store_true', default=False,382 help='enable debug messages')383 parser.add_argument('--rpc_debug', action='store_true', default=False,384 help='enable debug messages for XMLRPC call')385 parser.add_argument('--nouse_dolphin', action='store_false', default=True,386 dest='use_dolphin', help='whether to skip dolphin control'387 ' (remote server). default: %(default)s')388 parser.add_argument('--use_polld', action='store_true', default=False,389 help='whether to use polld (for polling GPIO port on '390 'remote server) or poll local GPIO port, default: '391 '%(default)s')392 parser.add_argument('--host', default='127.0.0.1', type=str,393 help='hostname of server, default: %(default)s')394 parser.add_argument('--dolphin_port', default=9997, type=int,395 help='port that dolphin_server listens, default: '396 '%(default)d')397 parser.add_argument('--polld_port', default=9998, type=int,398 help='port that polld listens, default: %(default)d')399 parser.add_argument('--servod_port', default=9999, type=int,400 help='port that servod listens, default: %(default)d')401 parser.add_argument('--polling_wait_secs', default=5, type=int,402 help=('# seconds for polling button clicking event, '403 'default: %(default)d'))404 return parser.parse_args()405def main():406 args = ParseArgs()407 logging.basicConfig(408 level=logging.DEBUG if args.debug else logging.INFO,409 format='%(asctime)s - %(levelname)s - %(message)s')410 polld_port = args.polld_port if args.use_polld else None411 dolphin_port = args.dolphin_port if args.use_dolphin else None412 handler = InterruptHandler(args.host, polld_port, args.servod_port,413 dolphin_port, args.rpc_debug,414 args.polling_wait_secs)415 handler.Init()416 handler.Run()417if __name__ == '__main__':418 try:419 main()420 except KeyboardInterrupt:421 sys.exit(0)422 except gpio_utils.GpioManagerError as e:423 sys.stderr.write(str(e) + '\n')...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Testify automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful