Best Python code snippet using tempest_python
TacoMar.py
Source:TacoMar.py  
...33    def execute_command(self, cmd_name, *args):34        return self.getCommandObject(cmd_name)(*args)35    def _wait(self, task, end_state=MAR_IDLE, timeout=5):36        with gevent.Timeout(timeout, RuntimeError("MAR detector: Timeout waiting for state")):37            while self._get_task_state(task, end_state):38                time.sleep(0.1)39    def _get_task_state(self, task, expected_state=MAR_IDLE):40        if self.execute_command("detector_substate", task) == expected_state:41            return 042        else:43            return 144        45    def wait(self):46        with gevent.Timeout(20, RuntimeError("Timeout waiting for detector")):47            logging.debug("CCD clearing...")48            if self._get_task_state(MAR_ACQUIRE):49                logging.debug("CCD integrating...")50            if self._get_task_state(MAR_READ):51                logging.debug("CCD reading out...")52                self._wait(MAR_READ)53                logging.debug("CCD reading done.")54            if self._get_task_state(MAR_CORRECT):55                logging.debug("CCD correcting...")56                self._wait(MAR_CORRECT)57                logging.debug("CCD correction done.")58    def prepare_acquisition(self, take_dark, start, osc_range, exptime, npass, number_of_images, comment="", energy=None, still=False):59        self.header["start_phi"] = start60        self.header["rotation_range"] = osc_range61        self.header["exposure_time"] = exptime62        self.header["dataset_comments"] = comment63        self.header["file_comments"] = ""64        self.current_filename = ""65	self.current_thumbnail2 = ""66	self.current_thumbnail1 = ""67        self.stop()68        if take_dark:...views.py
Source:views.py  
...70            auth = _authenticate(req['cc'])71            if auth['status'] == 'ok':72                return auth['competitor']73    raise PermissionDenied74def _get_task_state(task, competitor):75    answers = task.answer_set.filter(competitor=competitor).order_by('time')76    if len(answers) == 0:77        return 'open'78    last_answer = list(answers)[-1]79    if last_answer.code == task.code:80        return 'solved'81    if len(answers) == 1:82        return 'second chance'83    return 'blocked'84def task_list(request):85    competitor = _get_competitor_get(request)86    tasks = Task.objects.filter(enabled=True).order_by('nr')87    fail_penalty = Setting.objects.get(key='fail_penalty').value_dec88    tasks_list = []89    points = decimal.Decimal(0)90    for task in tasks:91        state = _get_task_state(task, competitor)92        if state == 'solved':93            points += task.points94        elif state == 'blocked':95            points -= fail_penalty96        tasks_list.append({97            'pk': task.pk,98            'nr': task.nr,99            'title': task.title,100            'state': state,101            'points': float(task.points)102        })103    return HttpResponse(json.dumps({'tasks': tasks_list, 'points': float(points)}))104def task_list_demo_mode(request):105    if not settings.DEMO_MODE:106        raise PermissionDenied107    tasks = Task.objects.filter(enabled=True).order_by('nr')108    fail_penalty = Setting.objects.get(key='fail_penalty').value_dec109    tasks_list = []110    for task in tasks:111        tasks_list.append({112            'pk': task.pk,113            'nr': task.nr,114            'title': task.title,115            'description': task.description,116            'points': float(task.points),117            'code': task.code,118            'state': 'open',119            'media_url': settings.MEDIA_URL120        })121    return HttpResponse(json.dumps({122        'tasks': tasks_list,123        'fail_penalty': float(fail_penalty),124        'points': 0.0125    }), content_type='text/json')126def task_get(request, task_id):127    competitor = _get_competitor_get(request)128    task = get_object_or_404(Task, pk=task_id, enabled=True)129    TaskView(competitor=competitor, task=task, time=timezone.now()).save()130    state = _get_task_state(task, competitor)131    response = {132        'nr': task.nr,133        'title': task.title,134        'description': task.description,135        'state': state,136        'media_url': settings.MEDIA_URL137    }138    return HttpResponse(json.dumps(response))139def task_enter_code(request, task_id):140    competitor = _get_competitor_post(request)141    task = get_object_or_404(Task, pk=task_id, enabled=True)142    state = _get_task_state(task, competitor)143    if state == 'solved':144        return HttpResponse('already solved')145    if state == 'blocked':146        return HttpResponse('already blocked')147    req = json.loads(request.body.decode('utf-8'))148    task.answer_set.create(competitor=competitor, code=req['code'], time=timezone.now())149    state = _get_task_state(task, competitor)...waiters.py
Source:waiters.py  
...19# NOTE(afazekas): This function needs to know a token and a subject.20def wait_for_server_status(client, server_id, status, ready_wait=True,21                           extra_timeout=0):22    """Waits for a server to reach a given status."""23    def _get_task_state(body):24        task_state = body.get('OS-EXT-STS:task_state', None)25        return task_state26    # NOTE(afazekas): UNKNOWN status possible on ERROR27    # or in a very early stage.28    resp, body = client.get_server(server_id)29    old_status = server_status = body['status']30    old_task_state = task_state = _get_task_state(body)31    start_time = int(time.time())32    timeout = client.build_timeout + extra_timeout33    while True:34        # NOTE(afazekas): Now the BUILD status only reached35        # between the UNKOWN->ACTIVE transition.36        # TODO(afazekas): enumerate and validate the stable status set37        if status == 'BUILD' and server_status != 'UNKNOWN':38            return39        if server_status == status:40            if ready_wait:41                if status == 'BUILD':42                    return43                # NOTE(afazekas): The instance is in "ready for action state"44                # when no task in progress45                # NOTE(afazekas): Converted to string bacuse of the XML46                # responses47                if str(task_state) == "None":48                    # without state api extension 3 sec usually enough49                    time.sleep(CONFIG.compute.ready_wait)50                    return51            else:52                return53        time.sleep(client.build_interval)54        resp, body = client.get_server(server_id)55        server_status = body['status']56        task_state = _get_task_state(body)57        if (server_status != old_status) or (task_state != old_task_state):58            LOG.info('State transition "%s" ==> "%s" after %d second wait',59                     '/'.join((old_status, str(old_task_state))),60                     '/'.join((server_status, str(task_state))),61                     time.time() - start_time)62        if server_status == 'ERROR':63            raise exceptions.BuildErrorException(server_id=server_id)64        timed_out = int(time.time()) - start_time >= timeout65        if timed_out:66            message = ('Server %s failed to reach %s status within the '67                       'required time (%s s).' %68                       (server_id, status, timeout))69            message += ' Current status: %s.' % server_status70            raise exceptions.TimeoutException(message)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
