Best Python code snippet using avocado_python
views.py
Source:views.py  
1from django.db.models import Q2from django.shortcuts import render3from rest_framework.decorators import action4# from apps.cmdb.models import CMDBBase5# from apps.cmdb.serializers import CMDBBaseModelSerializer6from base.mixins import BulkCreateModelMixin7from base.response import json_api_response8from base.views import BaseApiView9from base.views import BaseModelViewSet10from .filters import ServiceTreeFilter11from .models import NodeJoinTag12from .models import NodeLinkOperaPermission13from .models import NodeLinkServer14from .models import ServiceTree15from .serializers import NodeJoinTagSerializer16from .serializers import NodeLinkOperaPermissionModelSerializer17from .serializers import NodeLinkServerSerializer18from .serializers import ServiceTreeListSerializer19# Create your views here.20def show_genres(request):21    return render(22        request,23        "service_tree/genres.html",24        {'genres': ServiceTree.objects.all()}25    )26class ServiceTreeModelViewSet(BulkCreateModelMixin, BaseModelViewSet):27    """æå¡æ """28    queryset = ServiceTree.objects.all()29    serializer_class = ServiceTreeListSerializer30    pagination_class = None31    filterset_class = ServiceTreeFilter32    def get_queryset(self):33        queryset = super(ServiceTreeModelViewSet, self).get_queryset()34        u = self.request.user35        if u.username in ["admin", "root"]:36            return queryset37        38        node_pks = []39        # u = UserProfile.objects.get(username=self.request.user)40        # write member41        for x in u.write_member.all():42            up_nodes = x.node.get_ancestors(ascending=False, include_self=True)43            node_pks.extend([n.pk for n in up_nodes])44        # read member45        for x in u.read_member.all():46            up_nodes = x.node.get_ancestors(ascending=False, include_self=True)47            node_pks.extend([n.pk for n in up_nodes])48        return queryset.filter(pk__in=list(set(node_pks)))49        50    def list(self, request, *args, **kwargs):51        queryset = self.filter_queryset(self.get_queryset())52        data = self.get_serializer(queryset.filter(parent=None), many=True).data53        for instance in data:54            instance['children'] = self.set_children_nodes(instance["pk"], queryset)55        return json_api_response(code=0, data=data, message="")56    def set_children_nodes(self, pk, qs):57        children_set = qs.filter(parent=pk)  # 第2å± ops, ter, prt58        children_data = self.get_serializer(children_set, many=True).data59        for child in children_data:60            child_qs = qs.filter(parent=child['pk'])  # 第3å± monkey, api-gw61            if child_qs.exists():62                child['children'] = self.set_children_nodes(child['pk'], qs)63        return children_data64    @action(methods=["get"], detail=True)65    def opera_permission_member(self, request, pk=None):66        ret = {67            "read_member": [],68            "write_member": [],69            "read_member_ref": [],  # ç»§æ¿ read70            "write_member_ref": []  # ç»§æ¿ write71        }72        # å½åèç¹73        node = self.get_object()74        # å½åèç¹ä¸ææçå¶åèç¹75        _nodes = node.get_ancestors(ascending=True, include_self=True)76        # OneToOneField77        for idx, n in enumerate(_nodes):78            try:79                read_member_set = n.nodelinkoperapermission.read_member.all()80                write_member_set = n.nodelinkoperapermission.write_member.all()81                read_member = [{'id': o.pk, 'username': o.username, 'email': o.email, 'name': o.name} for o in read_member_set]82                write_member = [{'id': o.pk, 'username': o.username, 'email': o.email, 'name': o.name} for o in write_member_set]83            except ServiceTree.nodelinkoperapermission.RelatedObjectDoesNotExist:84                continue85            if idx == 0:86                ret['read_member'] = read_member87                ret['write_member'] = write_member88            else:89                ret['read_member_ref'].extend(read_member)90                ret['write_member_ref'].extend(write_member)91        return json_api_response(code=0, data=ret, message=None)92    @action(methods=["get"], detail=True)93    def server(self, request, pk=None, *args, **kwargs):94        """95        æ¥è¯¢èç¹ä¸ææå¶åèç¹çæºå¨ä¿¡æ¯96        /api/v1/service_tree/<pk>/server/?page=1&page_size=10097        /api/v1/service_tree/<pk>/server/?page=1&page_size=100&hostname=<>&private_ip=<>98        @param request:99        @param pk:100        @return:101        """102        is_pagination = False103        page = request.query_params.get('page', 1)104        page_size = request.query_params.get('page_size', 15)105        if page:106            is_pagination = True107        # å½åèç¹108        node = self.get_object()109        # å½åèç¹ä¸ææçå¶åèç¹110        _nodes = node.get_descendants(include_self=True)111        _leaf_nodes = [n for n in _nodes if n.is_leaf_node()]112        # OneToOneField113        cmdb_pks = []114        for leaf_node in _leaf_nodes:115            try:116                link_server = leaf_node.nodelinkserver.cmdbs.all()117            except:118                pass119            else:120                cmdb_pks.extend([s.pk for s in link_server])121        # # ForeignKey122        # node_set = []123        # for leaf_node in _leaf_nodes:124        #     node_set.extend(leaf_node.nodelinkserver_set.all())125        # cmdb_pks = []126        # for s in node_set:127        #     cmdb_pks.extend([_s.pk for _s in s.cmdbs.all()])128        qs = CMDBBase.objects.filter(pk__in=cmdb_pks, is_deleted=False)129        qs = self.server_filter(qs, request.query_params)130        count = qs.count()131        if is_pagination:132            start = (int(page) - 1) * int(page_size)133            stop = start + int(page_size)134            qs = qs[start:stop]135        response = {136            "count": count,137            "results": CMDBBaseModelSerializer(qs, many=True).data138        }139        return json_api_response(code=0, data=response, message=None)140    @action(methods=["get"], detail=True)141    def tag(self, request, pk=None, *args, **kwargs):142        """143        æ¥è¯¢èç¹å
³èçtag144        /api/v1/service_tree/<pk>/tag/145        - ä¸éè¦å页146        {147            "tag": []148            "tag_ref": [[],[]]149        }150        """151        response = {"tag": [], "tag_ref": []}152        # å½åèç¹153        node = self.get_object()154        # å½åèç¹å«å½åèç¹ä¸ç»§æ¿çtag155        _nodes = node.get_ancestors(ascending=True, include_self=True)156        for idx, n in enumerate(_nodes):157            kvs = [{"key": info.key, "value": info.value, "id": info.pk} for info in n.nodejointag_set.all()]158            if idx == 0:159                response['tag'] = kvs160                continue161            if not kvs:162                continue163            response['tag_ref'].append(kvs)164        return json_api_response(code=0, data=response, message=None)165    @action(methods=["get"], detail=True)166    def tag_v2(self, request, pk=None, *args, **kwargs):167        """168        æ¥è¯¢èç¹å
³èçtag169        /api/v1/service_tree/<pk>/tag/170        - ä¸éè¦å页171        @param request:172        @param pk:173        @return:174        {175            "tag": {}176            "tag_ref": [{},{}]177        }178        """179        response = {"tag": {}, "tag_ref": []}180        # å½åèç¹181        node = self.get_object()182        # å½åèç¹å«å½åèç¹ä¸ç»§æ¿çtag183        _nodes = node.get_ancestors(ascending=True, include_self=True)184        for idx, n in enumerate(_nodes):185            kvs = {info.key: info.value for info in n.nodejointag_set.all()}186            if idx == 0:187                response['tag'] = kvs188                continue189            if not kvs:190                continue191            response['tag_ref'].append(kvs)192        return json_api_response(code=0, data=response, message=None)193    def server_filter(self, qs, query):194        # å
¨å±æ¨¡ç³æç´¢195        dft_v = query.get('default')196        if dft_v:197            qs = qs.filter(198                Q(hostname__contains=dft_v) |199                Q(private_ip__contains=dft_v)200            )201        else:202            # ç²¾åæç´¢203            hostname_v = query.get('hostname')204            if hostname_v:205                qs = qs.filter(hostname=hostname_v)206            private_ip_v = query.get('private_ip')207            if private_ip_v:208                qs = qs.filter(private_ip=private_ip_v)209        return qs210    def destroy(self, request, *args, **kwargs):211        ins = self.get_object()212        # ä¸å
许跨å±çº§å é¤213        if not ins.is_leaf_node():214            return json_api_response(code=-1, data=None, message="ä¸å
许跨å±çº§å é¤.")215        return super(ServiceTreeModelViewSet, self).destroy(request, *args, **kwargs)216class NodeOperaPermissionModelViewSet(BaseModelViewSet):217    """æå¡æ  å
³èèç¹æä½æé"""218    queryset = NodeLinkOperaPermission.objects.all()219    serializer_class = NodeLinkOperaPermissionModelSerializer220    pagination_class = None221    def create(self, request, *args, **kwargs):222        """223        妿 node ä¸å卿¶ï¼åæ·»å ï¼å¦åä¿®æ¹224        """225        serializer = self.get_serializer(data=request.data)226        if serializer.is_valid():227            self.perform_create(serializer)228            return json_api_response(code=0, data=serializer.data, message=None)229        print("data", serializer.data)230        try:231            instance = self.queryset.get(node_id=serializer.data['node'])232        except NodeLinkOperaPermission.DoesNotExist:233            return json_api_response(code=-1, data=None, message="not found.")234        partial = kwargs.pop('partial', True)235        s = self.get_serializer(instance, data=request.data, partial=partial)236        s.is_valid(raise_exception=True)237        try:238            instance.read_member.add(*serializer.data['read_member'])239            instance.write_member.add(*serializer.data['write_member'])240        except Exception as e:241            return json_api_response(code=-1, data=e.args, message=None)242        return json_api_response(code=0, data=s.data, message=None)243    def destroy(self, request, *args, **kwargs):244        """245        /api/v1/service_tree/opera_permission/<node_id>/246        {247            read_member: []248            write_member: []249        }250        """251        data = request.data252        try:253            node = ServiceTree.objects.get(pk=kwargs['pk'])254        except ServiceTree.DoesNotExist:255            return json_api_response(code=-1, data=None, message=f"node {kwargs['pk']} not found")256        node.nodelinkoperapermission.read_member.remove(*data.get('read_member', []))257        node.nodelinkoperapermission.write_member.remove(*data.get('write_member', []))258        return json_api_response(code=0, data=None, message="å é¤æå.")259class NodeLinkServerModelViewSet(BaseModelViewSet):260    """261    æå¡æ  å
³èæå¡å¨262    """263    queryset = NodeLinkServer.objects.all()264    serializer_class = NodeLinkServerSerializer265    pagination_class = None266    def create(self, request, *args, **kwargs):267        """268        妿 node ä¸å卿¶ï¼åæ·»å ï¼å¦åä¿®æ¹269        """270        if 'app_key' in request.data:271            try:272                node = self.queryset.get(node__appkey=request.data['app_key'])273            except NodeLinkServer.DoesNotExist:274                return json_api_response(code=-1, data=None, message="app_key not found.")275            request_data = {276                "node": node.node_id,277                "cmdbs": request.data['cmdbs'],278            }279        else:280            request_data = request.data281        serializer = self.get_serializer(data=request_data)282        if serializer.is_valid():283            self.perform_create(serializer)284            return json_api_response(code=0, data=serializer.data, message=None)285        else:286            try:287                instance = self.queryset.get(node_id=serializer.data['node'])288            except NodeLinkServer.DoesNotExist:289                return json_api_response(code=-1, data=None, message="not found.")290            partial = kwargs.pop('partial', True)291            s = self.get_serializer(instance, data=request_data, partial=partial)292            s.is_valid(raise_exception=True)293            instance.cmdbs.add(*serializer.data['cmdbs'])294            return json_api_response(code=0, data=serializer.data, message=None)295class NodeLinkTagModelViewSet(BaseModelViewSet):296    """297    æå¡æ  å
³èTag298    https://help.aliyun.com/document_detail/171446.html?spm=5176.2020520101manager.0.0.12914a9cStpRuR299    """300    queryset = NodeJoinTag.objects.all()301    serializer_class = NodeJoinTagSerializer302    pagination_class = None303class UnlinkNodeServerApiView(BaseApiView):304    """305    仿塿 è§£ç»Server306    delete /api/v1/service_tree/unlink/<node_id>/307    {308        "server_ids": [1, 2, 3]  # cmdbçid309    }310    """311    def delete(self, request, *args, **kwargs):312        data = request.data313        node_id = kwargs['pk']314        server_ids = data['server_ids']315        try:316            node_link_server = NodeLinkServer.objects.get(node_id=node_id)317        except NodeLinkServer.DoesNotExist:318            return json_api_response(code=-1, data=None, message=f"node_id {node_id} not found.")319        instances = CMDBBase.objects.filter(pk__in=server_ids)320        node_link_server.cmdbs.remove(*[x.pk for x in instances])321        return json_api_response(code=0, data=None, message=None)322class ParentNodeInfoApiView(BaseApiView):323    """324    æ¥è¯¢èç¹ä¸ææçç¶èç¹325    /api/v1/service_tree/parents/100/326    """327    authentication_classes = []328    permission_classes = []329    def get(self, request, pk, *args, **kwargs):330        try:331            node = ServiceTree.objects.get(pk=pk)332        except ServiceTree.DoesNotExist:333            return json_api_response(code=-1, data=None, message=f"pk {pk} not found.")334        s = ServiceTreeListSerializer(node.get_ancestors(ascending=True, include_self=True), many=True)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
