...126 self._setup_rabbitmq()127 def _get_msg_properties(self):128 properties = pika.BasicProperties(**self._rabbitmq_msg_properties)129 return properties130 def _get_msg_from_queue(self):131 if self._queue_consumer is None:132 self._queue_consumer = self._start_queue_consumer()133 method, properties, body = self._queue_consumer.__next__()134 if not self._queue_consumption_lock.locked():135 self._queue_consumption_lock.acquire()136 return method.delivery_tag, body137 def get(self):138 self._queue_consumption_lock.acquire()139 delivery_tag, msg = self._get_msg_from_queue()140 self._last_msg_seen_delivery_tag = delivery_tag141 self._last_msg_seen_hash = xxhash.xxh64(msg).hexdigest()142 return msg143 def put(self, msg, rabbitmq_msg_properties={}):144 if self._put_lock.locked():145 raise QueueError("Put operation is disabled")146 msg_properties = self._get_msg_properties()147 if rabbitmq_msg_properties:148 for _property in rabbitmq_msg_properties:149 _property_value = rabbitmq_msg_properties[_property]150 setattr(msg_properties, _property, _property_value)151 if type(msg) == dict:152 msg = json.dumps(msg)153 SUCCESS = False...

...68 self._early_status = None69 self.status = {}70 self.interrupt = None71 self._failed = False72 def _get_msg_from_queue(self):73 """74 Helper method to handle safely getting messages from the queue.75 :return: Message, None if exception happened.76 :rtype: dict77 """78 try:79 return self.queue.get()80 # Let's catch all exceptions, since errors here mean a81 # crash in avocado.82 except Exception as details: # pylint: disable=W070383 self._failed = True84 TEST_LOG.error("RUNNER: Failed to read queue: %s", details)85 return None86 @property87 def early_status(self):88 """89 Get early status90 """91 if self._early_status:92 return self._early_status93 else:94 queue = []95 while not self.queue.empty():96 msg = self._get_msg_from_queue()97 if msg is None:98 break99 if "early_status" in msg:100 self._early_status = msg101 for _ in queue: # Return all unprocessed messages back102 self.queue.put(_)103 return msg104 else: # Not an early_status message105 queue.append(msg)106 def __getattribute__(self, name):107 # Update state before returning the value108 if name in ("status", "interrupt"):109 self._tick()110 return super(TestStatus, self).__getattribute__(name)111 def wait_for_early_status(self, proc, timeout):112 """113 Wait until early_status is obtained114 :param proc: test process115 :param timeout: timeout for early_state116 :raise exceptions.TestError: On timeout/error117 """118 step = 0.01119 end = time.time() + timeout120 while not self.early_status:121 if not proc.is_alive():122 if not self.early_status:123 raise exceptions.TestError("Process died before it pushed "124 "early test_status.")125 if time.time() > end and not self.early_status:126 os.kill(, signal.SIGTERM)127 if not wait.wait_for(lambda: not proc.is_alive(), 1, 0, 0.01):128 os.kill(, signal.SIGKILL)129 msg = ("Unable to receive test's early-status in %ss, "130 "something wrong happened probably in the "131 "avocado framework." % timeout)132 raise exceptions.TestError(msg)133 time.sleep(step)134 def _tick(self):135 """136 Process the queue and update current status137 """138 while not self.queue.empty():139 msg = self._get_msg_from_queue()140 if msg is None:141 break142 if "func_at_exit" in msg:143 self.job.funcatexit.register(msg["func_at_exit"],144 msg.get("args", tuple()),145 msg.get("kwargs", {}),146 msg.get("once", False))147 elif not msg.get("running", True):148 self.status = msg149 self.interrupt = True150 elif "paused" in msg:151 self.status = msg152 self.job.result_events_dispatcher.map_method('test_progress', False)153 paused_msg = msg['paused']...

