Best Python code snippet using autotest_python
gl_visualizer.py
Source:gl_visualizer.py  
1import numpy as np2from vispy import app, gloo, keys3import sys4sys.path.append('../../../../fieldtrip/realtime/src/buffer/python')5sys.path.append('../../fieldtrip/realtime/src/buffer/python')6sys.path.append('/Users/fraimondo/dev/fieldtrip/realtime/src/buffer/python')7import FieldTrip as ft8from argparse import ArgumentParser9parser = ArgumentParser(description='Visualize QRS Exp from fieldtrip buffer')10parser.add_argument('--host', metavar='host', type=str, nargs=1,11                    default=['localhost'],12                    help='fieldtrip buffer host (default localhost)')13parser.add_argument('--port', metavar='port', type=int, nargs=1,14                    default=[1972],15                    help='fieldtrip buffer port (default 1972)')16parser.add_argument('--scale', metavar='scale', type=int, nargs=1,17                    default=[8000],18                    help='fieldtrip buffer port (default 1972)')19parser.add_argument('--filter', dest='filter', action='store_true',20                    default=False,21                    help='Filter original data')22args = parser.parse_args()23host = args.host[0]24port = args.port[0]25if args.filter is True:26    sys.path.append('./iir1/')27    import pyiir1 as iir28class RingBuffer(object):29    def __init__(self, max_size, rows=1, dtype=np.float):30        self.size = 031        self.max_size = max_size32        self.buffer = np.zeros((self.max_size, rows), dtype=dtype)33        self.counter = 034    def append(self, data, auto=False):35        """this is an O(n) operation"""36        n = len(data)37        # if auto is True:38        #   print self.remaining, self.size, '->',39        if self.max_size - len(self) < n:40            if auto is True:41                # print 'Auto consume', n - (self.max_size - len(self)), '->',42                self.consume(n - (self.max_size - len(self)))43            else:44                raise RuntimeError("Buffer Overflow")45        if self.remaining < n:46            # print 'compacting',47            self.compact()48        # if auto is True:49            # print self.remaining, self.size50        if isinstance(data, np.ndarray):51            if len(data.shape) < 2:52                self.buffer[self.counter + self.size:][:n] = data[:, None]53            else:54                self.buffer[self.counter + self.size:][:n] = data55        else:56            self.buffer[self.counter + self.size:][:n] = \57                np.array(data)[:, None]58        self.size += n59    def consume(self, n):60        self.counter += n61        self.size -= n62    @property63    def remaining(self):64        return self.max_size - (self.counter + self.size)65    def compact(self):66        """67        note: only when this function is called, is an O(size)68        performance hit incurred,69        and this cost is amortized over the whole padding space70        """71        self.buffer[:self.size] = self.view72        self.counter = 073    def ravel(self):74        return self.view.ravel()75    @property76    def view(self):77        """this is always an O(1) operation"""78        return self.buffer[self.counter:][:self.size]79    def cumsum(self, axis, dtype, out):80        return np.cumsum(self.buffer[self.counter:][:self.size],81                         axis, dtype, out)82    def mean(self, axis, dtype, out, keepdims=False):83        return np.mean(self.buffer[self.counter:][:self.size],84                       axis, dtype, out, keepdims)85    # def argmax(self, axis):86    #     return np.argmax(self.buffer[self.counter:][:self.size], axis)87    def __len__(self):88        return self.size89    def __getitem__(self, index):90        # import pdb; pdb.set_trace()91        if isinstance(index, slice):92            start = index.start93            stop = index.stop94        elif isinstance(index, int):95            start = index96            stop = index + 197        if start is None:98            start = 099        if stop is None:100            stop = 0101        if start < 0:102            start += self.size103            stop += self.size104        if stop > self.size:105            raise RuntimeError("Slice end cannot be bigger than size")106        elif stop < 0:107            stop += self.size108        if start < 0:109            raise RuntimeError("Out of bounds")110        stop += self.counter111        start += self.counter112        return self.buffer[start:stop]113    def __repr__(self):114        return self.view.__repr__()115print('Reading data from {}:{}'.format(host, port))116ftc = ft.Client()117ftc.connect(host, port)118H = ftc.getHeader()119if H is None:120    print('Failed to retrieve header!')121    sys.exit(1)122print(H)123print(H.labels)124# Config125sfreq = H.fSample126buffer_size = 4127MAX_SAMPS = 100000128x_scale_init = int(5 * sfreq)  # Number of samples to display129y_scale_init = np.float(args.scale[0])  # Amplitude130y_offset_init = 0  # -8190.0 # -200.0  # Offset131# End config132DELAYS = {}133DELAYS[250.0] = 12134DELAYS[500.0] = 4135DELAYS[1000.0] = 8136DELAYS[5000.0] = 389137SIGNAL_VERT_SHADER = """138#version 120139attribute float signal_pos;140attribute float index;141uniform int n_rows;142uniform int row;143uniform float width;144uniform float height;145uniform float y_scale;146uniform float y_offset;147void main (void)148{149    float offset = 1.0 - (2.0 / n_rows * (row + 0.5));150    gl_Position = vec4(index * 1.5 + 0.5,151                       ((signal_pos + y_offset) / y_scale) / n_rows + offset,152                       0.0, 1.0);153}154"""155SIGNAL_FRAG_SHADER = """156#version 120157uniform vec4 COLOR_MASKS[ 8 ] = vec4[] (vec4( 0.0, 0.0, 0.0, 1.0 ),158                                vec4( 1.0, 0.0, 0.0, 1.0 ),159                                vec4( 0.0, 1.0, 0.0, 1.0 ),160                                vec4( 0.0, 0.0, 1.0, 1.0 ),161                                vec4( 1.0, 1.0, 0.0, 1.0 ),162                                vec4( 1.0, 0.0, 1.0, 1.0 ),163                                vec4( 0.0, 1.0, 1.0, 1.0 ),164                                vec4( 1.0, 1.0, 1.0, 1.0 ) );165uniform int color;166void main()167{168    gl_FragColor = COLOR_MASKS[color];169}170"""171MARKER_VERT_SHADER = """172#version 120173attribute float markers;174attribute float y_pos;175uniform float start_samp;176uniform float width;177uniform float height;178uniform float x_scale;179void main (void)180{181    gl_Position = vec4((markers - start_samp)/x_scale * 1.5 + 0.5, y_pos, 0.0, 1.0);182}183"""184MARKER_FRAG_SHADER = """185#version 120186uniform vec4 COLOR_MASKS[ 8 ] = vec4[] (vec4( 0.0, 0.0, 0.0, 1.0 ),187                                vec4( 1.0, 0.0, 0.0, 1.0 ),188                                vec4( 0.0, 1.0, 0.0, 1.0 ),189                                vec4( 0.0, 0.0, 1.0, 1.0 ),190                                vec4( 1.0, 1.0, 0.0, 1.0 ),191                                vec4( 1.0, 0.0, 1.0, 1.0 ),192                                vec4( 0.0, 1.0, 1.0, 1.0 ),193                                vec4( 1.0, 1.0, 1.0, 1.0 ) );194uniform int color;195void main()196{197    gl_FragColor = COLOR_MASKS[color];198}199"""200class Canvas(app.Canvas):201    def __init__(self, do_filter=False):202        app.Canvas.__init__(self, title='Realtime ECG Detection',203                            keys='interactive')204        self.signal_program = gloo.Program(SIGNAL_VERT_SHADER,205                                           SIGNAL_FRAG_SHADER)206        self.marker_program = gloo.Program(MARKER_VERT_SHADER,207                                           MARKER_FRAG_SHADER)208        self.samps = RingBuffer(MAX_SAMPS, rows=3, dtype=np.float32)209        self.samps.append(np.zeros((x_scale_init + 1, 3)))210        self.x_scale = x_scale_init211        self.y_scale = y_scale_init212        self.y_offset = y_offset_init213        # self.signal_program['index'] = (214        #     np.arange(-x_scale_init, 0, 1) / x_scale_init).astype(np.float32)215        # self.signal_program['signal_pos'] = self.samps[-x_scale_init:]216        self.signal_program['y_scale'] = self.y_scale217        self.signal_program['y_offset'] = self.y_offset218        self.signal_program['n_rows'] = 3219        self.p_peaks = np.array([]).astype(np.float32)220        self.d_peaks = np.array([]).astype(np.float32)221        self.e_peaks = np.array([]).astype(np.float32)222        self.m_peaks = np.array([]).astype(np.float32)223        self._timer = app.Timer(1 / sfreq, connect=self.on_timer,224                                start=True)225        self.prev_sample = 0226        self.this_samp = 0227        self.do_filter = do_filter228        self.filter_delay = 0229        if do_filter:230            self.iir = None231    def reset(self):232        print('Resetting data')233        self.samps = RingBuffer(MAX_SAMPS, rows=3, dtype=np.float32)234        self.samps.append(np.zeros((x_scale_init + 1, 3)))235        self.p_peaks = np.array([]).astype(np.float32)236        self.d_peaks = np.array([]).astype(np.float32)237        self.e_peaks = np.array([]).astype(np.float32)238        self.m_peaks = np.array([]).astype(np.float32)239        self.prev_sample = 0240        self.this_samp = 0241    def on_initialize(self, event):242        gloo.set_state(clear_color='black', blend=True,243                       blend_func=('src_alpha', 'one_minus_src_alpha'))244    def on_resize(self, event):245        self.width, self.height = event.size246        self.signal_program['height'] = self.height247        self.signal_program['width'] = self.width248        self.marker_program['height'] = self.height249        self.marker_program['width'] = self.width250        print('H: {} - W: {}'.format(self.height, self.width))251        gloo.set_viewport(0, 0, self.width, self.height)252    def on_timer(self, event):253        newH = ftc.getHeader()254        if self.do_filter and self.iir is None:255            # Filter delay was computed for 0.5, 45.0 butterworth bandpass256            # at 250 Hz.257            self.filter_delay = DELAYS[newH.fSample]258            self.iir = [iir.ButterworthBandPass(3, newH.fSample, 8, 20.0)259                        for _ in range(3)]260            for i in range(3):261                self.iir[i].reset()262        new_samples = newH.nSamples - self.prev_sample263        if (new_samples > 0):264            beg = (self.prev_sample if new_samples < MAX_SAMPS265                   else newH.nSamples - MAX_SAMPS)266            data = ftc.getData([beg, newH.nSamples - 1])267            samps = np.copy(data[:, 0:3])268            if self.do_filter:269                for i in range(0, 1):270                    this_samps = samps[:, i].tolist()271                    self.iir[i].filter(this_samps)272                    samps[:, i] = np.array(this_samps)273            self.samps.append(samps, auto=True)274            self.this_samp = data[:, 3][-1]275            new_pp = data[:, 4]276            new_pp = new_pp[new_pp != 0]277            if len(new_pp) > 0:278                for p in new_pp:279                    cp = np.float32(p + self.filter_delay)280                    if cp not in self.p_peaks:281                        self.p_peaks = np.append(self.p_peaks, cp)282            new_dp = data[:, 5]283            new_dp = new_dp[new_dp != 0]284            if len(new_dp) > 0:285                self.d_peaks = np.append(self.d_peaks,286                                         new_dp + self.filter_delay)287            new_mp = data[:, 6]288            new_mp = new_mp[new_mp != 0]289            if len(new_mp) > 0:290                self.m_peaks = np.append(self.m_peaks,291                                         new_mp + self.filter_delay)292            self.prev_sample = newH.nSamples293        self.p_peaks = self.p_peaks[294            self.p_peaks > (self.this_samp - self.x_scale)]295        self.d_peaks = self.d_peaks[296            self.d_peaks > (self.this_samp - self.x_scale)]297        self.e_peaks = self.e_peaks[298            self.e_peaks > (self.this_samp - self.x_scale)]299        if self.p_peaks.shape[0] > 0 and self.d_peaks.shape[0] > 0:300            equal = np.intersect1d(self.p_peaks, self.d_peaks)301            if len(equal) > 0:302                self.e_peaks = np.append(self.e_peaks, equal)303                self.d_peaks = np.delete(self.d_peaks, equal)304                self.p_peaks = np.delete(self.p_peaks, equal)305        self.marker_program['start_samp'] = self.this_samp306        self.marker_program['x_scale'] = self.x_scale307        self.update()308    def on_draw(self, event):309        gloo.clear()310        index = (311            np.arange(-self.x_scale, 0, 1) / self.x_scale).astype(np.float32)312        index_shifted = (313            np.arange(-self.x_scale + self.filter_delay, self.filter_delay, 1) /314            self.x_scale).astype(np.float32)315        for i, index, color in zip(316                range(3), (index, index_shifted, index_shifted), [2, 7, 6]):317            self.signal_program['index'] = index318            self.signal_program['row'] = i319            self.signal_program['color'] = color320            self.signal_program['signal_pos'] = \321                self.samps[-self.x_scale:][:, i].ravel()322            self.signal_program.draw('line_strip')323        if self.p_peaks.shape[0] > 0:324            self.marker_program['markers'] = np.repeat(self.p_peaks, 2)325            self.marker_program['y_pos'] = np.tile(326                [-1, 1], self.p_peaks.shape[0]).astype(np.float32)327            self.marker_program['color'] = 1328            self.marker_program.draw('lines')329        if self.d_peaks.shape[0] > 0:330            self.marker_program['markers'] = np.repeat(self.d_peaks, 2)331            self.marker_program['y_pos'] = np.tile(332                [-1, 1], self.d_peaks.shape[0]).astype(np.float32)333            self.marker_program['color'] = 3334            self.marker_program.draw('lines')335        if self.e_peaks.shape[0] > 0:336            self.marker_program['markers'] = np.repeat(self.e_peaks, 2)337            self.marker_program['y_pos'] = np.tile(338                [-1, 1], self.e_peaks.shape[0]).astype(np.float32)339            self.marker_program['color'] = 4340            self.marker_program.draw('lines')341        if self.m_peaks.shape[0] > 0:342            self.marker_program['markers'] = np.repeat(self.m_peaks, 2)343            self.marker_program['y_pos'] = np.tile(344                [-1, 1], self.m_peaks.shape[0]).astype(np.float32)345            self.marker_program['color'] = 6346            self.marker_program.draw('lines')347    def on_mouse_wheel(self, event):348        # self.print_mouse_event(event, 'Mouse wheel')349        delta = int(10 * event.delta[1])350        self.y_scale = max(1, self.y_scale + delta)351        self.signal_program['y_scale'] = self.y_scale352        print('Scale: {}'.format(self.y_scale))353    def on_mouse_press(self, event):354        if event.button == 1:355            self.last_pos = event.pos[1]356        # self.print_mouse_event(event, 'Mouse press')357    def on_mouse_release(self, event):358        if event.button == 1 and self.last_pos is not None:359            delta = event.pos[1] - self.last_pos360            self.y_offset -= 10 * delta361            self.signal_program['y_offset'] = self.y_offset362            print('Offset: {}'.format(self.y_offset))363        self.last_pos = None364        # self.print_mouse_event(event, 'Mouse release')365    def print_mouse_event(self, event, what):366        modifiers = ', '.join([key.name for key in event.modifiers])367        print('{} - pos: {}, button: {}, modifiers: {}, delta: {}'.format(368              what, event.pos, event.button, modifiers, event.delta))369    def change_time_scale(self, value):370        self.x_scale = max(2, self.x_scale + value)371    def on_key_press(self, event):372        if event.key is keys.SPACE:373            if self._timer.running:374                self._timer.stop()375            else:376                self._timer.start()377        elif event.key == keys.LEFT:378            self.change_time_scale(-1)379        elif event.key == keys.RIGHT:380            self.change_time_scale(1)381        elif event.key == 'r':382            self.reset()383        else:384            modifiers = [key.name for key in event.modifiers]385            print('Key pressed - text: %r, key: %s, modifiers: %r' % (386                  event.text, event.key.name387                  if event.key is not None else '', modifiers))388canvas = Canvas(do_filter=args.filter)389canvas.show()...monitor_db_watcher.py
Source:monitor_db_watcher.py  
...29    return banner_output % command_output30def kill_monitor():31    logging.info("Killing scheduler")32    # try shutdown first33    utils.signal_program(monitor_db.PID_FILE_PREFIX, sig=signal.SIGINT)34    if utils.program_is_alive(monitor_db.PID_FILE_PREFIX): # was it killed?35        # give it some time to shutdown36        time.sleep(30)37        # kill it38        utils.signal_program(monitor_db.PID_FILE_PREFIX)39def handle_sigterm(signum, frame):40    logging.info('Caught SIGTERM')41    kill_monitor()42    utils.delete_pid_file_if_exists(monitor_db.WATCHER_PID_FILE_PREFIX)43    sys.exit(1)44signal.signal(signal.SIGTERM, handle_sigterm)45SiteMonitorProc = utils.import_site_class(46    __file__, 'autotest_lib.scheduler.site_monitor_db_watcher',47    'SiteMonitorProc', object)48class MonitorProc(SiteMonitorProc):49    def __init__(self, do_recovery=False):50        args = [monitor_db_path]51        if do_recovery:52            args.append("--recover-hosts")...Signal_program.spec
Source:Signal_program.spec  
1# -*- mode: python ; coding: utf-8 -*-2block_cipher = None3a = Analysis(4    ['Signal_program.py'],5    pathex=[],6    binaries=[],7    datas=[],8    hiddenimports=[],9    hookspath=[],10    hooksconfig={},11    runtime_hooks=[],12    excludes=[],13    win_no_prefer_redirects=False,14    win_private_assemblies=False,15    cipher=block_cipher,16    noarchive=False,17)18pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)19exe = EXE(20    pyz,21    a.scripts,22    a.binaries,23    a.zipfiles,24    a.datas,25    [],26    name='Signal_program',27    debug=False,28    bootloader_ignore_signals=False,29    strip=False,30    upx=True,31    upx_exclude=[],32    runtime_tmpdir=None,33    console=True,34    disable_windowed_traceback=False,35    argv_emulation=False,36    target_arch=None,37    codesign_identity=None,38    entitlements_file=None,39    icon='favicon.ico',...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
