How to use _record_status method in avocado

Best Python code snippet using avocado_python

plugin_buffered_recorder.py

Source:plugin_buffered_recorder.py Github

copy

Full Screen

1from plugins.base import Process2from specialized.plugin_picamera import PiCameraProcessBase3from plugins.decorators import make_plugin4from specialized.camera_support.mux import DualBufferedMP45from specialized.plugin_media_manager import MEDIA_MANAGER_PLUGIN_NAME6from plugins.processes_host import find_plugin7from Pyro4 import expose as pyro_expose8import logging9from misc.logging import ensure_logging_setup, camel_to_snake10from datetime import datetime11from misc.settings import SETTINGS12from safe_picamera import PiVideoFrameType13from threading import Lock14import math15from specialized.plugin_status_led import Status16BUFFERED_RECORDER_PLUGIN_NAME = 'BufferedRecorder'17ensure_logging_setup()18_log = logging.getLogger(camel_to_snake(BUFFERED_RECORDER_PLUGIN_NAME))19@make_plugin(BUFFERED_RECORDER_PLUGIN_NAME, Process.CAMERA)20class BufferedRecorderPlugin(PiCameraProcessBase):21 def __init__(self):22 super(BufferedRecorderPlugin, self).__init__()23 self._last_sps_header_stamp = 024 self._recorder = DualBufferedMP4()25 self._record_user_info = None26 self._is_recording = False27 self._keep_media = True28 self._flush_lock = Lock()29 self._has_just_flushed = False30 self._buffer_max_age = None31 self._sps_header_max_age = None32 self._footage_max_age = None33 self._record_status = None34 self._record_status_lock = Lock()35 def __enter__(self):36 super(BufferedRecorderPlugin, self).__enter__()37 self._has_just_flushed = True38 self._recorder.__enter__()39 return self40 def __exit__(self, exc_type, exc_val, exc_tb):41 self._set_recording_status(False)42 self._recorder.__exit__(exc_type, exc_val, exc_tb)43 super(BufferedRecorderPlugin, self).__exit__(exc_type, exc_val, exc_tb)44 def _set_recording_status(self, value):45 with self._record_status_lock:46 if value and self._record_status is None:47 self._record_status = Status.pulse((1, 0, 0))48 self._record_status.__enter__()49 elif not value and self._record_status is not None:50 self._record_status.__exit__(None, None, None)51 self._record_status = None52 @pyro_expose53 @property54 def footage_age(self):55 return self._recorder.footage_age56 @pyro_expose57 @property58 def buffer_age(self):59 return self._recorder.buffer_age60 @pyro_expose61 @property62 def total_age(self):63 return self._recorder.total_age64 @pyro_expose65 @property66 def footage_max_age(self):67 return self._footage_max_age68 @property69 def _camera(self):70 return self.root_picamera_plugin.camera71 @property72 def _last_frame(self):73 return self._camera.frame74 @pyro_expose75 @property76 def buffer_max_age(self):77 if self._buffer_max_age is None:78 # Lazily load this value, because we must be sure that a camera is instantiated79 self._buffer_max_age = 2 * self._camera.framerate * \80 SETTINGS.camera.get('buffer', cast_to_type=float, default=2.0, ge=1.0)81 return self._buffer_max_age82 @pyro_expose83 @buffer_max_age.setter84 def buffer_max_age(self, value):85 self._buffer_max_age = max(self._camera.framerate * 0.5, 1, value)86 @pyro_expose87 @property88 def sps_header_max_age(self):89 if self._sps_header_max_age is None:90 # Lazily load this value, because we must be sure that a camera is instantiated91 self._sps_header_max_age = self._camera.framerate * SETTINGS.camera.get(92 'clip_length_tolerance', cast_to_type=float, default=1.0, ge=1.0)93 return self._sps_header_max_age94 @pyro_expose95 @sps_header_max_age.setter96 def sps_header_max_age(self, value):97 self._sps_header_max_age = max(self._camera.framerate * 0.5, 1, value)98 @property99 def _last_sps_header_age(self):100 return self._recorder.total_age - self._last_sps_header_stamp101 def _handle_split_point(self):102 if self.footage_max_age is not None and self.footage_age >= self.footage_max_age:103 self._stop_and(True, handle_split_point_if_flushed=False)104 self._footage_max_age = None105 if self._recorder.is_recording and not self._is_recording:106 # We requested stop, but we haven't reached a split point. Now we can really stop.107 if self._keep_media:108 media_mgr = find_plugin(MEDIA_MANAGER_PLUGIN_NAME, Process.CAMERA)109 if not media_mgr:110 _log.error('No media manager is running on the CAMERA process.')111 _log.info('Discarding media with info %s.', str(self._record_user_info))112 self._recorder.stop_and_discard()113 else:114 file_name = self._recorder.stop_and_finalize(self._camera.framerate, self._camera.resolution)115 media = media_mgr.deliver_media(file_name, 'mp4', self._record_user_info)116 _log.info('Media %s with info %s was delivered.', str(media.uuid), str(self._record_user_info))117 else:118 _log.info('Discarding media with info %s.', str(self._record_user_info))119 self._recorder.stop_and_discard()120 self._record_user_info = None121 if self._recorder.buffer_age > self.buffer_max_age:122 self._recorder.rewind_buffer()123 # Update the sps header age124 self._last_sps_header_stamp = self._recorder.total_age125 @pyro_expose126 def record(self, info=None, stop_after_seconds=None):127 _log.info('Requested media with info %s of maximum length %s.', str(info), str(stop_after_seconds))128 self._keep_media = True129 self._is_recording = True130 self._record_user_info = info131 if stop_after_seconds is not None:132 stop_after_seconds = float(stop_after_seconds)133 if stop_after_seconds < 0 or math.isinf(stop_after_seconds):134 self._footage_max_age = None135 else:136 self._footage_max_age = int(max(1., stop_after_seconds) * self._camera.framerate)137 self._set_recording_status(True)138 self._recorder.record()139 @pyro_expose140 @property141 def is_recording(self):142 return self._recorder.is_recording and self._is_recording143 @pyro_expose144 @property145 def is_finalizing(self):146 return self._recorder.is_recording and self._keep_media and not self._is_recording147 def _stop_and(self, finalize, handle_split_point_if_flushed=True):148 self._set_recording_status(False)149 self._is_recording = False150 self._keep_media = finalize151 if handle_split_point_if_flushed:152 with self._flush_lock:153 # This is the only other split point at which we are sure that an SPS will have to follow154 if self._has_just_flushed:155 self._handle_split_point()156 @pyro_expose157 def stop_and_discard(self):158 self._stop_and(finalize=False)159 @pyro_expose160 def stop_and_finalize(self):161 self._stop_and(finalize=True)162 def write(self, data):163 with self._flush_lock:164 self._has_just_flushed = False165 # Update annotation166 self._camera.annotate_text = datetime.now().strftime('%Y-%m-%d %H:%M:%S')167 # If it's a split point, one can stop168 if self._last_frame.frame_type == PiVideoFrameType.sps_header:169 self._handle_split_point()170 self._recorder.append(data, True, self._last_frame.complete)171 else:172 self._recorder.append(data, False, self._last_frame.complete)173 # Do we need to request a new sps_header174 if self._last_sps_header_age > min(self.sps_header_max_age, self.buffer_max_age):175 self._camera.request_key_frame()176 def flush(self):177 with self._flush_lock:178 self._has_just_flushed = True...

Full Screen

Full Screen

journal.py

Source:journal.py Github

copy

Full Screen

...62 if res.fetchone() is None:63 sql = "INSERT INTO job_info (unique_id) VALUES (?)"64 self.journal_cursor.execute(sql, (state['job_unique_id'], ))65 self.journal.commit()66 def _record_status(self, state, action):67 sql = "INSERT INTO test_journal (tag, time, action, status) VALUES (?, ?, ?, ?)"68 # This shouldn't be required69 if action == "ENDED":70 status = state['status']71 else:72 status = None73 self.journal_cursor.execute(sql,74 (repr(state['name']),75 datetime.datetime(1, 1, 1).now().isoformat(),76 action,77 status))78 self.journal.commit()79 def pre_tests(self, job):80 pass81 def start_test(self, result, state):82 self.lazy_init_journal(state)83 self._record_status(state, "STARTED")84 def test_progress(self, progress=False):85 pass86 def end_test(self, result, state):87 self.lazy_init_journal(state)88 self._record_status(state, "ENDED")89 def post_tests(self, job):90 self._shutdown_journal()91class Journal(CLI):92 """93 Test journal94 """95 name = 'journal'96 description = "Journal options for the 'run' subcommand"97 def configure(self, parser):98 run_subcommand_parser = parser.subcommands.choices.get('run', None)99 if run_subcommand_parser is None:100 return101 self.parser = parser102 help_msg = ('Records test status changes (for use with '...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful