How to use _stream_event method in lisa

Best Python code snippet using lisa_python

InertialMeasurementUnits.py

Source:InertialMeasurementUnits.py Github

copy

Full Screen

1import time2import serial3import struct4import ctypes5import queue67import numpy as np8import numpy.matlib910import multiprocessing as mp11import matplotlib.pyplot as plt12import mpl_toolkits.mplot3d as plt3d13from mpl_toolkits.mplot3d.art3d import Poly3DCollection1415from .. import ns_sleep16from ..utils import Quaternion as quat17from . import AbstractBaseInput1819class InertialMeasurementUnits(AbstractBaseInput):20 """ Python implementation of the inertial measurement unit net """21 MAX_INIT_RETRIES = 102223 def __init__(self, name = 'IMU', com = '/dev/ttyACM0', baud = 115200, chan = [ 0, 1, 2, 3, 4 ], srate = 50.0):24 """ 25 Constructor 26 27 Parameters28 ----------29 name : str30 The unique device handle used to refer to the connected hardware31 com : str32 The named communication port for the serial connection (e.g. COM9 on Windows, /dev/ttyACM1 on linux)33 baud : int34 The communication baudrate for the IMU dongle (should be 115200)35 chan : iterable of ints36 The IMU peripheral IDs that should be attached37 srate : float38 The sampling rate used to poll the device3940 Returns41 -------42 obj43 An InertialMeasurementUnits interface object44 """45 # interface variables46 self._name = name47 self._channels = chan48 self._channelcount = len( chan )49 self._speriod = 1.0 / srate5051 # device variables52 self._state = np.zeros( ( 4 * self._channelcount, ) )53 self._calibrate = mp.Array( ctypes.c_float, 4 * self._channelcount )54 55 # initialize arrays56 self._state[:] = np.squeeze( np.matlib.repmat( np.array( [ 1, 0, 0, 0 ], dtype = np.float ), 1, self._channelcount ) )57 self._calibrate[:] = np.squeeze( np.matlib.repmat( np.array( [ 1, 0, 0, 0 ], dtype = np.float ), 1, self._channelcount ) )5859 # state variables60 self._state_buffer = mp.Queue()61 self._state_buffer_max = int( 5 * srate )6263 # streaming variables64 self._exit_event = mp.Event()65 self._conn_event = mp.Event()66 self._stream_event = mp.Event()67 self._print_event = mp.Event()6869 # viewing variables70 self._view_event = mp.Event()71 self._plot_buffer = mp.Queue()72 self._viewer = None7374 self._ser = None75 self._streamer = mp.Process( target = self._connect, args = ( com, baud ) )76 self._streamer.start()77 self._conn_event.wait()7879 def __del__(self):80 """ 81 Destructor82 83 This cleans up any spawned child processes / resources for the interface84 """85 # try:86 # if self._ser.is_open: self._ser.close()87 # except AttributeError: pass # never made the serial device88 try:89 if self._streamer.is_alive: 90 self._stream_event.clear()91 self._exit_event.set()92 self._streamer.join()93 except AttributeError: pass # never got to make the I/O process94 try:95 if self._viewer.is_alive: self.hide()96 except AttributeError: pass # no viewer exists currently9798 def _set_channels(self):99 """ 100 Sets active channels for the dongle101 102 Returns103 -------104 bool105 True if channels are properly set, False else106 """107 mask = b'\x80\x40\x20\x10\x08\x04\x02\x01'108 cmd = bytearray(b'\x63\x00')109 for i in self._channels:110 cmd[1] = cmd[1] | mask[i]111 self._ser.write(cmd)112 ch = self._ser.read(1)113 if len( ch ):114 ch = struct.unpack('B', ch)[0]115 return (ch == len(self._channels))116 else: return False117118 def _query_init(self):119 """ 120 Checks for proper initialization121 122 Returns123 -------124 bool125 True if each IMU was properly initialized, False else126 """127 for _ in range(0, InertialMeasurementUnits.MAX_INIT_RETRIES):128 self._ser.write(b'\x69')129 init = self._ser.read(1)130 init = struct.unpack('B', init)[0]131 if init == 121: return True132 time.sleep( 0.2 ) # wait a second133 return False134 135 def _chksum(self, b):136 """137 Verifies data communication integrity by comparing a checksum of the data with the last byte138139 Parameters140 ----------141 b : bytes142 Incoming data with a checksum byte appended143144 Returns145 -------146 bool147 True if checksum matches, False else148 """149 return ( ( sum(bytearray(b[:-1])) % 256 ) == b[-1] )150151 def _connect(self, com, baud):152 """153 Connect to specified IMU devices154155 Parameters156 ----------157 com : str158 The serial port that the IMU is connected to159 baud : int160 The communication baudrate for the serial device161162 Raises163 ------164 RuntimeError165 If the IMU was not initialized correctly166 ValueError167 If the IMU channels were not set correctly168169 Notes170 -----171 This function is the target of the the child polling process172 """173 try:174 self._ser = serial.Serial(com, baud, timeout = 0.5) # connect to serial device175 if self._set_channels(): # set up channel information176 if self._query_init(): # query initialization success177 self._conn_event.set() # signal connection178179 # streaming180 while not self._exit_event.is_set():181 if self._stream_event.is_set():182 self._stream()183 184 else:185 raise RuntimeError("IMU was not initialized correctly")186 else:187 raise ValueError("IMU channels were not set correctly", self._channels )188 finally:189 # close serial port (if we opened it)190 if self._ser is not None and self._ser.is_open:191 self._ser.close()192 193 # drain all processing queues so we can join194 self.flush()195 empty = False196 while not empty:197 try: self._plot_buffer.get( timeout = 1e-3 )198 except queue.Empty: empty = True199 self._conn_event.set()200201 # signal connection so main process isn't held up202 self._conn_event.set()203204 def _read(self):205 """ 206 Reads a single sample from the IMUs207208 While this function does not return anything, it sets the __state variable209 to the last measured sensor readings.210 """211 self._ser.write(b'\x77')212 sample = self._ser.read( 16 * self._channelcount + 1 )213 if self._chksum(sample):214 data = np.array(struct.unpack(4*self._channelcount*'f', sample[0:-1]))215 for i in range( 0, self._channelcount):216 idx1 = i * 4217 idx2 = idx1 + 4218 self._state[idx1:idx2] = quat.relative( np.array( self._calibrate[idx1:idx2] ), data[idx1:idx2] )219 220 while self._state_buffer.qsize() > self._state_buffer_max: 221 self._state_buffer.get( timeout = 1e-3 )222 self._state_buffer.put( self._state.copy(), timeout = 1e-3 )223 224 if self._print_event.is_set(): print( self._name, ':', self._state )225 if self._view_event.is_set(): self._plot_buffer.put( self._state )226 else:227 self._ser.flushInput()228 229 def _stream(self):230 """ 231 Streams data from the IMUs at the specified sampling rate 232233 Notes234 -----235 This function is the target of the the child polling process236 """237 t = time.time()238 while self._stream_event.is_set():239 t = t + self._speriod240 self._read()241 while max( t - time.time(), 0 ): ns_sleep( 1e2 )242243 def _plot(self):244 """245 Generates a visualization of the incoming data in real-time246 """247 # initialization248 cube_x = np.array( [ [ 0, 1, 1, 0, 0, 0 ], [ 1, 1, 0, 0, 1, 1 ],249 [ 1, 1, 0, 0, 1, 1 ], [ 0, 1, 1, 0, 0, 0 ] ] ) - 0.5250 cube_y = np.array( [ [ 0, 0, 1, 1, 0, 0 ], [ 0, 1, 1, 0, 0, 0 ],251 [ 0, 1, 1, 0, 1, 1 ], [ 0, 0, 1, 1, 1, 1 ] ] ) - 0.5252 cube_z = np.array( [ [ 0, 0, 0, 0, 0, 1 ], [ 0, 0, 0, 0, 0, 1 ],253 [ 1, 1, 1, 1, 0, 1 ], [ 1, 1, 1, 1, 0, 1 ] ] ) - 0.5254 cube_colors = 'rgbycm'255 256 n_rows = np.ceil( np.sqrt( self._channelcount ) )257 n_cols = np.ceil( np.sqrt( self._channelcount ) )258 259 gui = plt.figure()260 gui.canvas.set_window_title( self._name )261 262 polygons = []263 for i in range( 0, self._channelcount ):264 polygons.append( [] )265 ax = gui.add_subplot( n_rows, n_cols, i + 1,266 projection = '3d', aspect = 'equal' )267 for side in range( 0, 6 ):268 vtx = np.array( [ cube_x[:, side],269 cube_y[:, side],270 cube_z[:, side] ] )271 poly = plt3d.art3d.Poly3DCollection( [ np.transpose( vtx ) ] )272 poly.set_color( cube_colors[ side ] )273 poly.set_edgecolor( 'k' )274 polygons[ i ].append( poly )275 276 ax.add_collection3d( polygons[ i ][ side ] )277 ax.set_xlim( ( -1, 1 ) )278 ax.set_ylim( ( -1, 1 ) )279 ax.set_zlim( ( -1, 1 ) )280 ax.set_title( 'IMU #' + repr( self._channels[ i ] + 1 ) )281 ax.axis( 'off' )282283 # stream284 plt.tight_layout()285 plt.show( block = False )286 xnew = np.zeros( ( 4, 6 ) )287 ynew = np.zeros( ( 4, 6 ) )288 znew = np.zeros( ( 4, 6 ) )289 while self._view_event.is_set():290 try:291 data = None292 while self._plot_buffer.qsize() > 0: data = self._plot_buffer.get()293 if data is not None:294 for dev in range(0, self._channelcount):295 idx1 = dev * 4296 idx2 = idx1 + 4297 q = data[idx1:idx2]298 for j in range( 0, 6 ):299 for i in range( 0, 4 ):300 p = np.array( [ cube_x[ i, j ], 301 cube_y[ i, j ],302 cube_z[ i, j ] ] )303 pr = quat.rotate( q, p )304 xnew[ i, j ] = pr[ 0 ]305 ynew[ i, j ] = pr[ 1 ]306 znew[ i, j ] = pr[ 2 ]307 vtx = np.array( [ xnew[:, j], ynew[:, j], znew[:, j] ] )308 polygons[ dev ][ j ].set_verts( [ np.transpose( vtx ) ] )309 plt.pause( 0.05 )310 except: self._view_event.clear()311 plt.close( gui )312313 @property314 def name(self):315 """316 The unique handle for the device317 318 Returns319 -------320 str321 The specified unique handle of this device interface322 """ 323 return self._name324325 @property326 def state(self):327 """328 The current state for the device329 330 Returns331 -------332 numpy.ndarray (n_channels,)333 The last measured sensor readings334 """335 try:336 return self._state_buffer.get( timeout = 1e-3 )337 except queue.Empty:338 return None339340 @property341 def speriod(self): 342 """343 The sampling period for the device344 345 Returns346 -------347 float348 The sampling period of the device349 """350 return self._speriod351352 @property353 def channelcount(self):354 """355 The channel count for the device356357 Returns358 -------359 int360 The number of channels per sensor measurement361 """362 return self._channelcount363364 def set_calibrate(self, calibration_count = 100):365 """ 366 Set the calibration quaternions for the IMUs 367 368 Parameters369 ----------370 calibration_count : int371 The number of quaternions to sample to get a baseline372373 Returns374 -------375 bool376 True if calibration was successful, False else377 """378 running = self._stream_event.is_set() 379 self._stream_event.set()380 381 Q = []382 count = 0383 while count < calibration_count:384 q = self.state385 if q is not None:386 Q.append( q )387 count += 1388 389 if not running: self._stream_event.clear()390391 Q = np.vstack( Q ).T # quaternions x samples392 for i in range( 0, self._channelcount ):393 qidx = 4 * i394 self._calibrate[qidx:qidx+4] = quat.average( Q[qidx:qidx+4,:] )395 return True396397 # for _ in range( 0, calibration_count ):398 # self._ser.write( b'\x77' )399 # sample = self._ser.read(16*self._channelcount+1)400 # if self._chksum( sample ): 401 # Q.append( np.array( struct.unpack( 4 * self._channelcount*'f', sample[:-1] ) ) )402 # ns_sleep( 1e4 ) # wait for 10 ms403 # if len( Q ):404 # Q = np.vstack( Q ).T # quaternions x samples405 # for i in range( 0, self._channelcount ):406 # qidx = 4 * i407 # self._calibrate[qidx:qidx+4] = quat.average( Q[qidx:qidx+4, :] )408 # return True409 # else: return False410411 def clear_calibrate(self):412 """413 Clear the calibration quaternions for the IMUs414 """415 self._calibrate[:] = np.squeeze( np.matlib.repmat( np.array( [ 1, 0, 0, 0 ] ), 1, self._channelcount ) )416417 def get_calibrate(self):418 """419 Get the current calibration quaternions for the IMUs420421 Returns422 -------423 numpy.ndarray (n_channels x 4,)424 The calibration orientations for each sensor425 """426 return np.frombuffer( self._calibrate.get_obj(), np.float32 ).copy()427428 def run(self, display = False):429 """ 430 Starts the acquisition process of the IMUs431432 Parameters433 ----------434 display : bool435 Flag determining whether sensor measurements should be printed to console (True) or not (False)436 """437 if not self._stream_event.is_set():438 self._stream_event.set()439440 if display: self._print_event.set()441 else: self._print_event.clear()442443 def flush(self):444 """445 Dispose of all previous data446 """447 empty = False448 while not empty:449 try: self._state_buffer.get( timeout = 1e-3 )450 except queue.Empty: empty = True451452 def stop(self):453 """454 Stops the acquisition process of the IMUs455 """456 if self._stream_event.is_set():457 self._stream_event.clear()458459 def view(self):460 """ 461 Launches the GUI viewer of the IMU data 462 """463 if not self._view_event.is_set():464 self._view_event.set()465 self._viewer = mp.Process( target = self._plot )466 self._viewer.start()467 468 def hide(self):469 """ 470 Closes the GUI viewer of the IMU data471 """472 if self._view_event.is_set():473 self._view_event.clear()474 self._viewer.join()475476if __name__ == '__main__':477 import sys478 import inspect479 import argparse480481 # helper function for booleans482 def str2bool( v ):483 if v.lower() in [ 'yes', 'true', 't', 'y', '1' ]: return True484 elif v.lower() in [ 'no', 'false', 'n', 'f', '0' ]: return False485 else: raise argparse.ArgumentTypeError( 'Boolean value expected!' )486487 # parse commandline entries488 class_init = inspect.getargspec( InertialMeasurementUnits.__init__ )489 arglist = class_init.args[1:] # first item is always self490 defaults = class_init.defaults491 parser = argparse.ArgumentParser()492 for arg in range( 0, len( arglist ) ):493 try: tgt_type = type( defaults[ arg ][ 0 ] )494 except: tgt_type = type( defaults[ arg ] )495 if tgt_type is bool:496 parser.add_argument( '--' + arglist[ arg ], 497 type = str2bool, nargs = '?',498 action = 'store', dest = arglist[ arg ],499 default = defaults[ arg ] )500 else:501 parser.add_argument( '--' + arglist[ arg ], 502 type = tgt_type, nargs = '+',503 action = 'store', dest = arglist[ arg ],504 default = defaults[ arg ] )505 args = parser.parse_args()506 for arg in range( 0, len( arglist ) ):507 attr = getattr( args, arglist[ arg ] )508 if isinstance( attr, list ) and not isinstance( defaults[ arg ], list ):509 setattr( args, arglist[ arg ], attr[ 0 ] )510511 # create interface512513 imu = InertialMeasurementUnits( name = args.name, com = args.com, baud = args.baud, chan = args.chan, srate = args.srate )514 imu.run( display = False )515 imu.view()516517 while True:518 state = imu.state519 if state is not None: ...

Full Screen

Full Screen

CyberGlove.py

Source:CyberGlove.py Github

copy

Full Screen

1import serial2import struct3import ctypes4import time5import queue67import numpy as np8import multiprocessing as mp910from .. import ns_sleep11from . import AbstractBaseInput1213class CyberGlove(AbstractBaseInput):14 """ Python implementation of a Wired CyberGlove driver """15 def __init__(self, name = 'CyberGlove', com = '/dev/ttyUSB0', baud = 115200, srate = 50.0 ):16 """ 17 Constructor1819 Parameters20 ----------21 name : str22 The unique device handle used to refer to the connected hardware23 com : str24 The named communication port for the serial connection (e.g. COM9 on Windows, /dev/ttyUSB0 on linux)25 baud : int26 The communication baudrate for the CyberGlove (should be 115200)27 srate : float28 The sampling rate used to poll the device (maximum is 90 Hz)2930 Returns31 -------32 obj33 A CyberGlove interface object34 35 Raises36 ------37 ValueError38 If the CyberGlove returns an invalid channel count39 RuntimeError40 If the CyberGlove was not initialized correctly41 """42 # interface variables43 self._name = name44 self._speriod = 1.0 / srate4546 # we need to query the CyberGlove quickly47 self._ser = serial.Serial( com, baud, timeout = 0.5 )48 if self._query_init():49 ch = self._query_channels()50 if ch > 0:51 self._channelcount = ch52 else:53 raise ValueError("CyberGlove channel count must be positive", 'ch')54 else:55 raise RuntimeError("CyberGlove was not initialized correctly")5657 # release serial resource so child process can own it58 if self._ser.is_open: self._ser.close()59 self._ser = None6061 # state variables62 self._state = np.zeros( ( self._channelcount, ) )63 self._state_buffer = mp.Queue()64 self._state_buffer_max = int( 5 * srate )6566 self._calibrate = mp.Array( ctypes.c_double, 2 * self._channelcount )67 self._calibrate[:self._channelcount] = 255 * np.ones( self._channelcount )6869 # streaming variables70 self._exit_event = mp.Event()71 self._conn_event = mp.Event()72 self._stream_event = mp.Event()73 self._print_event = mp.Event()7475 # viewing variables76 self._view_event = mp.Event()77 self._plot_buffer = mp.Queue()78 self._viewer = None7980 # connect to hardware81 self._streamer = mp.Process( target = self._connect, args = ( com, baud ) )82 self._streamer.start()83 self._conn_event.wait()8485 def __del__(self):86 """ 87 Destructor 88 89 This cleans up any spawned child processes / resources for the interface90 """91 try:92 if self._streamer.is_alive:93 print( '', end = '', flush = True ) # TODO: WHY DOES THIS WORK?94 self._stream_event.clear()95 self._exit_event.set()96 self._streamer.join()97 except AttributeError: pass # never got to make the I/O process98 try:99 if self._viewer.is_alive: self.hide()100 except AttributeError: pass # no viewer exists currently101102 def _query_init(self):103 """ 104 Checks for proper device initialization105106 Returns107 -------108 bool109 True if properly initialized, False else110 """111 self._ser.write(b'\x3F\x47')112 init = self._ser.read(4)113 if len(init) < 4: return False114 else: return (init[2] == 3)115116 def _query_channels(self):117 """ 118 Checks for number of channels the CyberGlove has 119 120 Returns121 -------122 int123 Number of CyberGlove sensors124 """125 self._ser.write(b'\x3F\x53')126 ch = self._ser.read( 4 )127 if len(ch) < 4: return 0128 else: return int( ch[2] )129130 def _connect(self, com, baud):131 """132 Connect to specified CyberGlove device133134 Parameters135 ----------136 com : str137 The serial port that the CyberGlove is connected to138 baud : int139 The communication baudrate for the serial device140141 Raises142 ------143 RuntimeError144 If the CyberGlove was not initialized correctly145146 Notes147 -----148 This function is the target of the the child polling process149 """150 try:151 self._ser = serial.Serial(com, baud, timeout = 0.5) # connect to serial device152 if self._query_init(): # query initialization success153 self._conn_event.set() # signal connection154 155 # streaming156 while not self._exit_event.is_set():157 if self._stream_event.is_set():158 self._stream()159 else: 160 raise RuntimeError("CyberGlove was not initialized correctly")161 finally:162 # close serial port (if we opened it)163 if self._ser is not None and self._ser.is_open: 164 self._ser.close()165 166 # drain all processing queues so we can join167 self.flush()168 empty = False169 while not empty:170 try: self._plot_buffer.get( timeout = 1e-3 )171 except queue.Empty: empty = True172 173 # signal connection so main process isn't held up174 self._conn_event.set()175176 def _read(self):177 """ 178 Reads a single sample from the CyberGlove179180 While this function does not return anything, it sets the _state variable181 to the last measured sensor readings.182 183 Notes184 -----185 For the Wired CyberGlove, channels correspond to the following sensors:186 00 Thumb MCP 01 Thumb PIP 02 Thumb DIP 03 Thumb ABD187 04 Index MCP 05 Index PIP 06 Index DIP 07 Index ABD188 08 Middle MCP 09 Middle PIP 10 Middle DIP 11 Middle ABD189 12 Ring MCP 13 Ring PIP 14 Ring DIP 15 Ring ABD190 16 Pinky MCP 17 Pinky PIP 18 Pinky DIP 19 Pinky ABD191 20 Palm Arch 21 Wrist Pitch 22 Wrist Yaw192 """193 self._ser.write(b'\x47')194 sample = self._ser.read( self._channelcount + 2 )195 values = np.array( struct.unpack( self._channelcount*'B', sample[1:-1] ) )196197 vmin = np.array( self._calibrate[ -self._channelcount: ] )198 vmax = np.array( self._calibrate[ :self._channelcount ] )199 200 self._state[:] = np.divide( values - vmin, vmax - vmin )201 while self._state_buffer.qsize() > self._state_buffer_max: 202 self._state_buffer.get( timeout = 1e-3 )203 self._state_buffer.put( self._state.copy(), timeout = 1e-3 )204205 if self._print_event.is_set(): print( self._name, ':', self._state )206 if self._view_event.is_set(): self._plot_buffer.put( self._state )207208 def _stream(self):209 """ 210 Streams data from the CyberGlove at the specified sampling rate211 """212 t = time.time()213 while self._stream_event.is_set():214 t = t + self._speriod215 self._read()216 while max( t - time.time(), 0 ): ns_sleep( 1e2 )217218 def _plot(self):219 """220 Generates a visualization of the incoming data in real-time221 """222 self._view_event.clear()223224 @property225 def name(self):226 """227 The unique handle for the device228 229 Returns230 -------231 str232 The specified unique handle of this device interface233 """ 234 return self._name235236 @property237 def state(self): 238 """239 The current state for the device240 241 Returns242 -------243 numpy.ndarray (n_channels,)244 The last measured sensor readings245 """246 try:247 return self._state_buffer.get( timeout = 1e-3 )248 except queue.Empty:249 return None250251 @property252 def speriod(self): 253 """254 The sampling period for the device255 256 Returns257 -------258 float259 The sampling period of the device260 """261 return self._speriod262263 @property264 def channelcount(self):265 """266 The channel count for the device267268 Returns269 -------270 int271 The number of channels per sensor measurement272 """ 273 return self._channelcount274275 def set_calibrate(self, timeout = 10):276 """277 Sets the calibration values for the CyberGlove278279 Calibration aims to record the maximum and minimum values of each sensor.280 These values are then used to scale subsequent readings between [0, 1].281282 Parameters283 ----------284 timeout : float285 The amount of time (in seconds) to collect calibration data286 """287 samples = []288 t0 = time.time()289 while ( time.time() - t0 ) < timeout:290 self._ser.write(b'\x47')291 sample = self._ser.read( self._channelcount + 2 )292 samples.append( np.array( struct.unpack( self._channelcount*'B', sample[1:-1] ) ) )293 ns_sleep( 1e4 ) # 10 ms294 samples = np.vstack( samples )295 self._calibrate[ :self._channelcount ] = np.amax( samples, axis = 0 )296 self._calibrate[ -self._channelcount: ] = np.amin( samples, axis = 0 )297298 def clear_calibrate(self):299 """300 Clears the calibration values for this CyberGlove301 """302 self._calibrate[ -self._channelcount: ] = np.zeros( self._channelcount )303 self._calibrate[ :self._channelcount ] = 255 * np.ones( self._channelcount )304305 def get_calibrate(self):306 """307 Get the current calibration values for this CyberGlove308309 Returns310 -------311 numpy.ndarray (n_channels,)312 The maximum values for each sensor313 numpy.ndarray (n_channels,)314 The minimum values for each sensor315 """316 cal = np.frombuffer( self._calibrate.get_obj() )317 cal = np.reshape( cal, ( 2, self._channelcount ) )318 return cal[0,:], cal[1,:]319320 def run(self, display = False):321 """ 322 Starts the acquisition process of the CyberGlove 323 324 Parameters325 ----------326 display : bool327 Flag determining whether sensor measurements should be printed to console (True) or not (False)328 """329 if not self._stream_event.is_set():330 self._stream_event.set()331 if display: self._print_event.set()332 else: self._print_event.clear()333334 def flush(self):335 """336 Dispose of all previous data337 """338 empty = False339 while not empty:340 try: self._state_buffer.get( timeout = 1e-3 )341 except queue.Empty: empty = True342343 def stop(self):344 """345 Stops the acquisition process of the CyberGlove 346 """347 if self._stream_event.is_set():348 self._stream_event.clear()349350 def view(self):351 """ 352 Launches the GUI viewer of the CyberGlove data 353 354 Notes355 -----356 This is currently not implemented357 """358 if not self._view_event.is_set():359 self._view_event.set()360 self._viewer = mp.Process( target = self._plot )361 self._viewer.start()362 363 def hide(self):364 """ 365 Closes the GUI viewer of the CyberGlove data 366 """367 if self._view_event.is_set():368 self._view_event.clear()369 self._viewer.join()370371if __name__ == '__main__':372 import sys373 import inspect374 import argparse375376 # helper function for booleans377 def str2bool( v ):378 if v.lower() in [ 'yes', 'true', 't', 'y', '1' ]: return True379 elif v.lower() in [ 'no', 'false', 'n', 'f', '0' ]: return False380 else: raise argparse.ArgumentTypeError( 'Boolean value expected!' )381382 # parse commandline entries383 class_init = inspect.getargspec( CyberGlove.__init__ )384 arglist = class_init.args[1:] # first item is always self385 defaults = class_init.defaults386 parser = argparse.ArgumentParser()387 for arg in range( 0, len( arglist ) ):388 try: tgt_type = type( defaults[ arg ][ 0 ] )389 except: tgt_type = type( defaults[ arg ] )390 if tgt_type is bool:391 parser.add_argument( '--' + arglist[ arg ], 392 type = str2bool, nargs = '?',393 action = 'store', dest = arglist[ arg ],394 default = defaults[ arg ] )395 else:396 parser.add_argument( '--' + arglist[ arg ], 397 type = tgt_type, nargs = '+',398 action = 'store', dest = arglist[ arg ],399 default = defaults[ arg ] )400 args = parser.parse_args()401 for arg in range( 0, len( arglist ) ):402 attr = getattr( args, arglist[ arg ] )403 if isinstance( attr, list ) and not isinstance( defaults[ arg ], list ):404 setattr( args, arglist[ arg ], attr[ 0 ] )405406 # create interface407408 cg = CyberGlove( name = args.name, com = args.com, baud = args.baud, srate = args.srate )409 cg.run( display = False )410 cg.view()411412 while True:413 state = cg.state414 if state is not None: ...

Full Screen

Full Screen

run_camera_frame_producer.py

Source:run_camera_frame_producer.py Github

copy

Full Screen

1import os2import threading3import signal4from functools import partial5from io import BytesIO6from typing import Dict7from multiprocessing import Process8import logging9from camera.camera_config import camera_config10from camera.manage_record import camera_record_factory11from camera.camera_frame_producer import CameraFrameProducer12from camera.pivideostream import PiVideoStream13from service_manager.runnable import Runnable14import multiprocessing as mp15CAMERA_WIDTH = camera_config.camera_width16CAMERA_HEIGHT = camera_config.camera_height17DEVICE_ID = os.environ['DEVICE_ID']18LOGGER = logging.getLogger(__name__)19class FrameProducer:20 def __init__(self, stream_event: mp.Event, process_event: mp.Event):21 self._stream_event = stream_event22 self._process_event = process_event23 def run(self, device_id: str) -> None:24 LOGGER.info('run camera frame producer')25 camera = CameraFrameProducer(device_id)26 stream = PiVideoStream(None, resolution=(27 CAMERA_WIDTH, CAMERA_HEIGHT), framerate=25)28 """29 For fps processing, we do not change directly FPS because we need to record at high fps.30 If the camera is recording and we try to change the camera fps we get this error:31 raise PiCameraRuntimeError("Recording is currently running")32 """33 # @rate_limited(max_per_second=0.5, thread_safe=False, block=True)34 def process_frame(frame: BytesIO):35 camera.process_frame(frame, stream=self._stream_event.is_set(), process=self._process_event.is_set())36 stream.process_frame = process_frame37 camera_record_factory(DEVICE_ID, stream)38 stream.run()39class RunCameraFrameProducer(Runnable):40 def __init__(self):41 self._stream_event = mp.Event()42 self._process_event = mp.Event()43 self._frame_producer = FrameProducer(self._stream_event, self._process_event)44 self._process = None45 def _process_join(self, process) -> None:46 process.join()47 if process.exitcode != -signal.SIGTERM:48 # something went wrong in the child process49 # -> kill the process. Remainder: we are inside a thread here,50 # so it appears that sys.exit(1) does not work... So, we will!51 os.kill(os.getpid(), signal.SIGINT)52 def run(self, device_id: str, status: bool, data=None) -> None:53 """54 Be careful to Falsy value! None does not mean to turn off the stream/process55 i.e data={'to_analyze': None, 'stream': False}56 do a strict equal to turn on/off something.57 """58 if data:59 if 'stream' in data:60 if data['stream'] is True:61 self._stream_event.set()62 elif data['stream'] is False:63 self._stream_event.clear()64 if 'to_analyze' in data:65 if data['to_analyze'] is True:66 self._process_event.set()67 elif data['to_analyze'] is False:68 self._process_event.clear()69 if status is True and self._process is None:70 run = partial(self._frame_producer.run, device_id)71 self._process = Process(target=run, daemon=True)72 verify = partial(self._process_join, self._process)73 t = threading.Thread(target=verify, daemon=True)74 self._process.start()75 t.start()76 return77 if status is False and self._process:78 self._process.terminate()79 self._process = None80 def __str__(self):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful