Best Python code snippet using tempest_python
api.py
Source:api.py  
1# -*- coding: utf-8 -*-2import re3from django.utils import timezone4from rest_framework import exceptions5from rest_framework.decorators import detail_route6from rest_framework.response import Response7from common_framework.utils.cache import CacheProduct8from event import models as event_models9from event.utils import common as event_common10from event.web import api as base_api, error11from event_exam.utils.task import TaskHandler12from event_exam.utils.exam_auth import api_auth_permission13from event_exam.models import SolvedRecord, SubmitRecord, SubmitFlags14from event.models import EventTask15from practice import api as practice_api16from practice_capability.models import TestPaperTask17from practice_theory.models import ChoiceTask18from practice_real_vuln.models import RealVulnTask19from practice_exercise.models import PracticeExerciseTask20from practice.private_api import _commit_task_answer21from x_note.models import Note22from . import serializers as mserializers23from event_exam import models as exam_models24from django.contrib import messages25from django.utils.translation import ugettext_lazy as _26import random27import collections28import json29DELIMITER = '|'30multiple = 131judgment = 232radio = 033operation = 434is_choice = 135RealVulnType = 136PracticeExerciseType = 237DEFAULT_CACHE_TIME = 60 * 538class Serializer:39    def __init__(self, raw, et):40        self.data = {41            'title': raw.title,42            'id': str(raw.id),43            'hash': str(raw.hash),44            'score': float(et.task_score),45            'content': raw.content46        }47        p_type = int(raw.hash.split('.')[-1])48        if p_type == 0:49            self.data['options'] = collections.OrderedDict(sorted(json.loads(raw.option).items(), key=lambda t: t[0]))50            # self.data['options'] = raw.option51            self.data['is_choice_question'] = 152            self.data['is_multiple_choice'] = 1 if raw.multiple else 053            self.data['question_type'] = raw.multiple54        else:55            if raw.file and raw.file.url:56                file_attach = {57                    'name': raw.file.name,58                    'url': raw.file.url,59                }60                self.data['file_url'] = file_attach61            self.data['url'] = raw.url62            self.data['is_dynamic_env'] = raw.is_dynamic_env63            self.data['question_type'] = operation64            self.data['solving_mode'] = raw.solving_mode65            self.data['is_choice_question'] = 066            self.data['score_mutiple'] = raw.score_multiple67class EventViewSet(base_api.EventViewSet):68    serializer_class = mserializers.EventSerializer69    @detail_route(methods=['get'], )70    @api_auth_permission71    def exam_task(self, request, pk):72        try:73            event = event_models.Event.objects.get(pk=pk)74        except Exception, e:75            raise exceptions.PermissionDenied(error.EVENT_NOT_START)76        taskArrary = event_models.EventTask.objects.filter(event=pk)77        rows = []78        done = False79        score = 080        all_score = 081        myAnswer = []82        eus = event_models.EventUserSubmitLog.objects.filter(event_task__event=pk, user=request.user)83        if eus.exists():84            done = True85            try:86                x_note = Note.objects.get(resource=event.hash, user=request.user)87                writeup_score = x_note.score88            except:89                writeup_score = 090            score = score + writeup_score91            for e in eus:92                right_answer = practice_api.get_task_answer(e.event_task.task_hash, request.user)93                tmp = {94                    'taskId': e.event_task.task_hash,95                    'answer': e.answer,96                }97                if event.extendevent.ans_display_method == 0:98                    tmp['rightAnswer'] = right_answer99                if event.extendevent.ans_display_method == 1:100                    if event.end_time < timezone.now():101                        tmp['rightAnswer'] = right_answer102                score = score + e.score103                myAnswer.append(tmp)104        for t in taskArrary:105            task = practice_api.get_task_object(t.task_hash)106            rows.append(Serializer(task, t).data)107            all_score = all_score + t.task_score108        '''109        è·åè¯å·ç¶æï¼1.å·²ç»ç»æï¼2.æ£å¨è¿è¡ï¼3.å°æªå¼å§110        '''111        now = timezone.now().strftime('%Y-%m-%d %H:%M:%S')112        if (event.start_time.strftime('%Y-%m-%d %H:%M:%S') <= now) and (113                    event.end_time.strftime('%Y-%m-%d %H:%M:%S') > now):114            process = 0115        elif (event.start_time.strftime('%Y-%m-%d %H:%M:%S') > now):116            process = 1117        else:118            process = 2119        return Response(120            {121                'response_data':122                    {123                        'tasks': rows, 'done': done,124                        'myAnswer': myAnswer, 'score': score,125                        'all_score': all_score,126                        'score_status': event.extendevent.score_status,127                        'rank_status': event.extendevent.rank_status128                    },129                'error_code': 0,130                'process': process131            })132    @detail_route(methods=['GET'], )133    @api_auth_permission134    def exam_user_ranking(self, request, pk):135        """136        è·åå½åç¨æ·èè¯æå137        """138        try:139            event = event_models.Event.objects.get(pk=pk)140        except Exception, e:141            raise exceptions.PermissionDenied(error.EVENT_NOT_START)142        from django.db.models import Sum143        eus = event_models.EventUserSubmitLog.objects.filter(event_task__event=pk).values('user').annotate(144            score=Sum('score'))145        x_note = Note.objects.filter(resource=event.hash).values('user').annotate(score=Sum('score'))146        # æ·»å writeup_score147        for data in eus:148            for note in x_note:149                if note['user'] == data['user']:150                    data['score'] += 2151        sorted_scores = sorted(list(eus), key=lambda a: -a.get('score'))152        ranking = ''153        for k, value in enumerate(sorted_scores):154            if value['user'] == request.user.id:155                ranking = k + 1156        return Response(data={'ranking': ranking})157    @detail_route(methods=['get'], )158    @api_auth_permission159    def get_tasks(self, request, pk):160        event = event_common.get_event_by_id(pk)161        if event.start_time > timezone.now():162            return Response({'response': {} , 'error_code': 1})163        if not event.public:164            return Response({'response': {}, 'error_code': 1})165        basic_information = {}166        grade = exam_models.SubmitRecord.objects.filter(event_id=event.id).filter(user_id=request.user).first()167        if grade is not None:168            basic_information["is_over"] = "true"169            basic_information["show_score"] = event.extendevent.score_status170            basic_information["answer"] = grade.answer171            if event.extendevent.score_status:172                basic_information["get_score"] = grade.score173                answer_dict, score_dict = self.get_correct_answer(event, pk)174                basic_information["correct_answer"] = answer_dict175        else:176            count_time = self.count_time(event)177            if count_time == 0 :178                basic_information["is_over"] = "true"179            else:180                basic_information["is_over"] = 'false'181                basic_information["count_time"] = count_time182        answer_record = SolvedRecord.objects.filter(user=request.user).filter(event=event).first()183        if answer_record is None:184            exam_models.SolvedRecord.objects.create(185                event=event,186                user=request.user,187            )188        return Response({'response': basic_information, 'error_code': 0})189    @detail_route(methods=['get'], )190    @api_auth_permission191    def review(self, request, pk):192        event = event_common.get_event_by_id(pk)193        if event.start_time > timezone.now():194            return Response({'response': {} , 'error_code': 1})195        if not event.public:196            return Response({'response': {}, 'error_code': 1})197        basic_information = {}198        grade = exam_models.SubmitRecord.objects.filter(event_id=event.id).filter(user_id=request.user).first()199        if grade is not None:200            basic_information["is_over"] = True201            basic_information["show_score"] = event.extendevent.score_status202            basic_information["rank_status"] = event.extendevent.rank_status203            basic_information["answer"] = grade.answer204            if event.extendevent.score_status:205                basic_information["get_score"] = grade.score206            if event.extendevent.rank_status:207                answer_dict, score_dict = self.get_correct_answer(event, pk)208                basic_information["correct_answer"] = answer_dict209                submit_reslut = SubmitFlags.objects.filter(event=event, user=request.user)210                is_right = {}211                if submit_reslut.exists():212                    for flag_result in submit_reslut:213                        is_right[flag_result.topic_hash] = True if flag_result.score != 0 else False214                basic_information["flag_is_right"] = is_right215        else:216            basic_information["is_over"] = False217            basic_information["answer"] = ''218        return Response({'response': basic_information, 'error_code': 0})219    def get_correct_answer(self, event, pk):220        test_list_cache = CacheProduct("test_list_cache")221        test_score_cache  = CacheProduct("test_score_cache")222        key = "%d_%d" % (int(pk), event.id)223        test_list_dateil = test_list_cache.get(key, None)224        test_score_dateil = test_score_cache.get(key, None)225        if test_list_dateil is None or test_score_dateil is None:226            task_arrary = self.get_event_task(pk)227            answer_dict = {}228            score_dict = {}229            for t in task_arrary:230                task = practice_api.get_task_object(t.task_hash)231                score_dict[int(task.id)] = t.task_score232                if task.answer is None:233                    is_dynamic_env = hasattr(task, "is_dynamic_env") if hasattr(task, "is_dynamic_env") else False234                    if is_dynamic_env:235                        task_answer = _("x_dynamic_answer")236                    else:237                        task_answer = _("x_answer_not_generated")238                else:239                    task_answer = task.answer240                answer_dict[int(task.id)] = str(task_answer)241            test_list_cache.set(key, answer_dict, DEFAULT_CACHE_TIME)242            test_score_cache.set(key, score_dict, DEFAULT_CACHE_TIME)243        else:244            answer_dict = test_list_dateil245            score_dict = test_score_dateil246        return answer_dict, score_dict247    # æ¶é´äºä»¶248    def count_time(self, event):249        count_time = int((event.end_time - timezone.now()).total_seconds())250        if count_time < 0:251            return 0252        else:253            return count_time254    @detail_route(methods=['POST'], )255    @api_auth_permission256    def submit_testpaper(self, request, pk):257        event = event_common.get_event_by_id(pk)258        if not event:259            return Response({'error_code': 1})260        taskhash = str(request.data.get('taskhash'))261        tasktype = int(taskhash.split(".")[-1])262        task_id = request.data.get("topic_id") #该é¢ç®çid263        answers = str(request.data.get('answers').encode('utf-8')) #çæ¡264        user = request.user265        # 没æçæ¡266        if not answers:267            return Response({'error_code': 2})268        test_information = None269        if tasktype == RealVulnType:270            test_information = RealVulnTask.objects.filter(hash=taskhash).first()271        elif tasktype == PracticeExerciseType:272            test_information = PracticeExerciseTask.objects.filter(hash=taskhash).first()273        if not event or not test_information:274            return Response({'error_code': 1})275        _ret = _commit_task_answer(tasktype, test_information, user, answers)276        if _ret['is_solved'] and (not _ret.has_key("is_over")):277            # correct_answer, score_dict = self.get_correct_answer(event, pk)278            # topic_score = score_dict[int(task_id)]  # è·å该é¢ç®çåæ°279            task = EventTask.objects.filter(task_hash=taskhash).first()280            topic_score = task.task_score281            get_score = float(format(topic_score * float(_ret["score"]) / float(_ret['all_score']), '.2f'))282            submit_record = SubmitFlags.objects.filter(event=event, user=request.user, topic_hash=taskhash)283            submit_record_ob = submit_record.first()284            if submit_record_ob is not None:285                submit_record.update(286                    score = get_score287                )288            else:289                SubmitFlags.objects.create(290                    event=event,291                    user=request.user,292                    score=get_score,293                    topic_hash=taskhash294                )295        else:296            pass297        _ret['score_per'] = float(format(float(_ret["score"]) / float(_ret['all_score']), '.2f'))298        _ret["num_task"] = len(event_models.EventTask.objects.filter(event=event))299        _ret["num_done_task"] = request.data.get('topic_num')300        _ret["solving_mode"] = test_information.solving_mode301        _ret["question_type"] = test_information.multiple if (test_information.hash.split(".")[-1] == 0) else operation302        return Response({'response': _ret, 'error_code': 0})303    @detail_route(methods=['POST'], )304    @api_auth_permission305    def finish_up_job(self, request, pk):306        # 交å·307        event = event_common.get_event_by_id(pk)308        user = request.user309        count_time = self.count_time(event)310        if count_time <= 0:311            return Response({'error_code': 1})312        answers = request.data.get('answer')313        get_score = 0314        if answers != "" and answers is not None:315            answers_dict = json.loads(answers)316            correct_answer, score_dict = self.get_correct_answer(event, pk)317            for id, answer in answers_dict.items():318                topic_correct_answer = correct_answer[int(id)].split("|")319                topic_score = score_dict[int(id)]320                if re.match(r'^[A-G]*$', answer) is None:321                    pass322                else:323                   topic_user_answer = list(str(answer))324                   standard_list1 = set(topic_user_answer)325                   standard_list2 = set(topic_correct_answer)326                   if len(standard_list1.difference(standard_list2)) == 0 and len(327                           standard_list2.difference(standard_list1)) == 0:328                       get_score = get_score + topic_score329        if SubmitFlags.objects.filter(event=event, user=user).exists():330            for user_topic_score in SubmitFlags.objects.filter(event=event, user=user):331                get_score = get_score + float(user_topic_score.score)332        grade = exam_models.SubmitRecord.objects.filter(event_id=event.id).filter(user_id=request.user)333        if grade.exists():334            grade.update(335                answer=answers,336                score=get_score337            )338        else:339            exam_models.SubmitRecord.objects.create(340                event=event,341                user=user,342                answer=answers,343                score=get_score344            )345        t = SolvedRecord.objects.filter(event=event).filter(user=user)346        t.delete()347        return Response({'error_code': 0})348    def get_event_task(self, pk):349        task_cache = CacheProduct("get_event_task")350        key = pk351        event_task = task_cache.get(key, None)352        if event_task:353            return event_task354        event_task = event_models.EventTask.objects.filter(event=pk)355        if event_task:356            task_cache.set(key, event_task, DEFAULT_CACHE_TIME)357        return event_task358class EventSignupUserViewSet(base_api.EventSignupUserViewSet):359    serializer_class = mserializers.EventSignupUserSerializer360class EventSignupTeamViewSet(base_api.EventSignupTeamViewSet):361    serializer_class = mserializers.EventSignupTeamSerializer362class EventTaskViewSet(base_api.EventTaskViewSet):363    serializer_class = mserializers.EventTaskSerializer364class EventUserSubmitLogViewSet(base_api.EventUserSubmitLogViewSet):365    serializer_class = mserializers.EventUserSubmitLogSerializer366    task_handler_class = TaskHandler367class EventUserAnswerViewSet(base_api.EventUserAnswerViewSet):368    serializer_class = mserializers.EventUserAnswerSerializer369class EventNoticeViewSet(base_api.EventNoticeViewSet):370    serializer_class = mserializers.EventNoticeSerializer371class EventTaskNoticeViewSet(base_api.EventTaskNoticeViewSet):...test_cache.py
Source:test_cache.py  
1from lyrebird.mock.cache import ListCache2def test_list_cache():3    cache = ListCache()4    cache.add('1')5    assert cache.items() == ['1']6    cache.clear()7    assert cache.items() == []8def test_list_cache_max():9    cache = ListCache(maxlen=10)10    for i in range(20):11        cache.add(i)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
