Best Python code snippet using avocado_python
plugin_buffered_recorder.py
Source:plugin_buffered_recorder.py  
1from plugins.base import Process2from specialized.plugin_picamera import PiCameraProcessBase3from plugins.decorators import make_plugin4from specialized.camera_support.mux import DualBufferedMP45from specialized.plugin_media_manager import MEDIA_MANAGER_PLUGIN_NAME6from plugins.processes_host import find_plugin7from Pyro4 import expose as pyro_expose8import logging9from misc.logging import ensure_logging_setup, camel_to_snake10from datetime import datetime11from misc.settings import SETTINGS12from safe_picamera import PiVideoFrameType13from threading import Lock14import math15from specialized.plugin_status_led import Status16BUFFERED_RECORDER_PLUGIN_NAME = 'BufferedRecorder'17ensure_logging_setup()18_log = logging.getLogger(camel_to_snake(BUFFERED_RECORDER_PLUGIN_NAME))19@make_plugin(BUFFERED_RECORDER_PLUGIN_NAME, Process.CAMERA)20class BufferedRecorderPlugin(PiCameraProcessBase):21    def __init__(self):22        super(BufferedRecorderPlugin, self).__init__()23        self._last_sps_header_stamp = 024        self._recorder = DualBufferedMP4()25        self._record_user_info = None26        self._is_recording = False27        self._keep_media = True28        self._flush_lock = Lock()29        self._has_just_flushed = False30        self._buffer_max_age = None31        self._sps_header_max_age = None32        self._footage_max_age = None33        self._record_status = None34        self._record_status_lock = Lock()35    def __enter__(self):36        super(BufferedRecorderPlugin, self).__enter__()37        self._has_just_flushed = True38        self._recorder.__enter__()39        return self40    def __exit__(self, exc_type, exc_val, exc_tb):41        self._set_recording_status(False)42        self._recorder.__exit__(exc_type, exc_val, exc_tb)43        super(BufferedRecorderPlugin, self).__exit__(exc_type, exc_val, exc_tb)44    def _set_recording_status(self, value):45        with self._record_status_lock:46            if value and self._record_status is None:47                self._record_status = Status.pulse((1, 0, 0))48                self._record_status.__enter__()49            elif not value and self._record_status is not None:50                self._record_status.__exit__(None, None, None)51                self._record_status = None52    @pyro_expose53    @property54    def footage_age(self):55        return self._recorder.footage_age56    @pyro_expose57    @property58    def buffer_age(self):59        return self._recorder.buffer_age60    @pyro_expose61    @property62    def total_age(self):63        return self._recorder.total_age64    @pyro_expose65    @property66    def footage_max_age(self):67        return self._footage_max_age68    @property69    def _camera(self):70        return self.root_picamera_plugin.camera71    @property72    def _last_frame(self):73        return self._camera.frame74    @pyro_expose75    @property76    def buffer_max_age(self):77        if self._buffer_max_age is None:78            # Lazily load this value, because we must be sure that a camera is instantiated79            self._buffer_max_age = 2 * self._camera.framerate * \80                                   SETTINGS.camera.get('buffer', cast_to_type=float, default=2.0, ge=1.0)81        return self._buffer_max_age82    @pyro_expose83    @buffer_max_age.setter84    def buffer_max_age(self, value):85        self._buffer_max_age = max(self._camera.framerate * 0.5, 1, value)86    @pyro_expose87    @property88    def sps_header_max_age(self):89        if self._sps_header_max_age is None:90            # Lazily load this value, because we must be sure that a camera is instantiated91            self._sps_header_max_age = self._camera.framerate * SETTINGS.camera.get(92                'clip_length_tolerance', cast_to_type=float, default=1.0, ge=1.0)93        return self._sps_header_max_age94    @pyro_expose95    @sps_header_max_age.setter96    def sps_header_max_age(self, value):97        self._sps_header_max_age = max(self._camera.framerate * 0.5, 1, value)98    @property99    def _last_sps_header_age(self):100        return self._recorder.total_age - self._last_sps_header_stamp101    def _handle_split_point(self):102        if self.footage_max_age is not None and self.footage_age >= self.footage_max_age:103            self._stop_and(True, handle_split_point_if_flushed=False)104            self._footage_max_age = None105        if self._recorder.is_recording and not self._is_recording:106            # We requested stop, but we haven't reached a split point. Now we can really stop.107            if self._keep_media:108                media_mgr = find_plugin(MEDIA_MANAGER_PLUGIN_NAME, Process.CAMERA)109                if not media_mgr:110                    _log.error('No media manager is running on the CAMERA process.')111                    _log.info('Discarding media with info %s.', str(self._record_user_info))112                    self._recorder.stop_and_discard()113                else:114                    file_name = self._recorder.stop_and_finalize(self._camera.framerate, self._camera.resolution)115                    media = media_mgr.deliver_media(file_name, 'mp4', self._record_user_info)116                    _log.info('Media %s with info %s was delivered.', str(media.uuid), str(self._record_user_info))117            else:118                _log.info('Discarding media with info %s.', str(self._record_user_info))119                self._recorder.stop_and_discard()120            self._record_user_info = None121        if self._recorder.buffer_age > self.buffer_max_age:122            self._recorder.rewind_buffer()123        # Update the sps header age124        self._last_sps_header_stamp = self._recorder.total_age125    @pyro_expose126    def record(self, info=None, stop_after_seconds=None):127        _log.info('Requested media with info %s of maximum length %s.', str(info), str(stop_after_seconds))128        self._keep_media = True129        self._is_recording = True130        self._record_user_info = info131        if stop_after_seconds is not None:132            stop_after_seconds = float(stop_after_seconds)133            if stop_after_seconds < 0 or math.isinf(stop_after_seconds):134                self._footage_max_age = None135            else:136                self._footage_max_age = int(max(1., stop_after_seconds) * self._camera.framerate)137        self._set_recording_status(True)138        self._recorder.record()139    @pyro_expose140    @property141    def is_recording(self):142        return self._recorder.is_recording and self._is_recording143    @pyro_expose144    @property145    def is_finalizing(self):146        return self._recorder.is_recording and self._keep_media and not self._is_recording147    def _stop_and(self, finalize, handle_split_point_if_flushed=True):148        self._set_recording_status(False)149        self._is_recording = False150        self._keep_media = finalize151        if handle_split_point_if_flushed:152            with self._flush_lock:153                # This is the only other split point at which we are sure that an SPS will have to follow154                if self._has_just_flushed:155                    self._handle_split_point()156    @pyro_expose157    def stop_and_discard(self):158        self._stop_and(finalize=False)159    @pyro_expose160    def stop_and_finalize(self):161        self._stop_and(finalize=True)162    def write(self, data):163        with self._flush_lock:164            self._has_just_flushed = False165        # Update annotation166        self._camera.annotate_text = datetime.now().strftime('%Y-%m-%d %H:%M:%S')167        # If it's a split point, one can stop168        if self._last_frame.frame_type == PiVideoFrameType.sps_header:169            self._handle_split_point()170            self._recorder.append(data, True, self._last_frame.complete)171        else:172            self._recorder.append(data, False, self._last_frame.complete)173        # Do we need to request a new sps_header174        if self._last_sps_header_age > min(self.sps_header_max_age, self.buffer_max_age):175            self._camera.request_key_frame()176    def flush(self):177        with self._flush_lock:178            self._has_just_flushed = True...journal.py
Source:journal.py  
...62        if res.fetchone() is None:63            sql = "INSERT INTO job_info (unique_id) VALUES (?)"64            self.journal_cursor.execute(sql, (state['job_unique_id'], ))65            self.journal.commit()66    def _record_status(self, state, action):67        sql = "INSERT INTO test_journal (tag, time, action, status) VALUES (?, ?, ?, ?)"68        # This shouldn't be required69        if action == "ENDED":70            status = state['status']71        else:72            status = None73        self.journal_cursor.execute(sql,74                                    (repr(state['name']),75                                     datetime.datetime(1, 1, 1).now().isoformat(),76                                     action,77                                     status))78        self.journal.commit()79    def pre_tests(self, job):80        pass81    def start_test(self, result, state):82        self.lazy_init_journal(state)83        self._record_status(state, "STARTED")84    def test_progress(self, progress=False):85        pass86    def end_test(self, result, state):87        self.lazy_init_journal(state)88        self._record_status(state, "ENDED")89    def post_tests(self, job):90        self._shutdown_journal()91class Journal(CLI):92    """93    Test journal94    """95    name = 'journal'96    description = "Journal options for the 'run' subcommand"97    def configure(self, parser):98        run_subcommand_parser = parser.subcommands.choices.get('run', None)99        if run_subcommand_parser is None:100            return101        self.parser = parser102        help_msg = ('Records test status changes (for use with '...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
