Best Python code snippet using slash
viewset.py
Source:viewset.py  
1from typing import Callable, List, Sequence, Union2from fastapi import APIRouter, Header3from fastapi.params import Depends4from pydantic import BaseModel5from .crudset import BaseCrudSet6__all__ = ['ViewSet', 'CrudViewSet']7supported_methods_names: List[str] = [8    'list', 'retrieve', 'create', 'update', 'partial_update', 'destroy']9class ViewSet:10    """ router: APIRouter = None11    base_path: str = None12    class_tag: str = None13    path_key: str = "id"14    response_model: BaseModel = None15    dependencies: Sequence[Depends] = None16    """17    router: APIRouter = None18    base_path: str = None19    class_tag: str = None20    path_key: str = "id"21    response_model: BaseModel = None22    dependencies: Sequence[Depends] = None23    marked_functions: List = []24    def __init__(self) -> APIRouter:25        self.functions: List[Callable] = []26        self.extra_functions: List[List] = []27        self.execute()28    def get_response_model(self, action: str) -> Union[BaseModel, None]:29        """ if override this method, you can return different response model for different action """30        if self.response_model is not None:31            return self.response_model32        return None33    def get_dependencies(self, action: str) -> Sequence[Depends]:34        """ if override this method, you can return different dependencies for different action """35        if self.dependencies is not None:36            return self.dependencies37        return None38    def execute(self) -> APIRouter:39        if self.router is None:40            self.router = APIRouter()41        if self.base_path is None:42            self.base_path = '/' + self.__class__.__name__.lower()43        if self.class_tag is None:44            self.class_tag = self.__class__.__name__45        for func in supported_methods_names:46            if hasattr(self, func):47                self.functions.append(getattr(self, func))48        for func in self.functions:49            self._register_route(func)50        for func, methods, path in self.find_marked_functions():51            self._register_extra_route(func, methods=methods, path=path)52    def _register_route(self, func: Callable, hidden_params: List[str] = ["self"]):53        # hidden_params TODO: add support for hidden params54        extras = {}55        extras['response_model'] = self.get_response_model(func.__name__)56        extras['dependencies'] = self.get_dependencies(func.__name__)57        if func.__name__ == 'list':58            self.router.add_api_route(self.base_path, func, tags=[59                                      self.class_tag], methods=['GET'], **extras)60        elif func.__name__ == 'retrieve':61            self.router.add_api_route(f"{self.base_path}/\u007b{self.path_key}\u007d", func, tags=[62                                      self.class_tag], methods=['GET'], **extras)63        elif func.__name__ == 'create':64            self.router.add_api_route(self.base_path, func, tags=[65                                      self.class_tag], methods=['POST'], **extras)66        elif func.__name__ == 'update':67            self.router.add_api_route(f"{self.base_path}/\u007b{self.path_key}\u007d", func, tags=[68                                      self.class_tag], methods=['PUT'], **extras)69        elif func.__name__ == 'partial_update':70            self.router.add_api_route(f"{self.base_path}/\u007b{self.path_key}\u007d", func, tags=[71                                      self.class_tag], methods=['PATCH'], **extras)72        elif func.__name__ == 'destroy':73            self.router.add_api_route(f"{self.base_path}/\u007b{self.path_key}\u007d", func, tags=[74                                      self.class_tag], methods=['DELETE'], **extras)75        else:76            print(f"Method {func.__name__} is not supported")77    def _register_extra_route(self, func: Callable, methods: List[str] = ["GET"], path: str = None):78        extras = {}79        extras['response_model'] = self.get_response_model(func.__name__)80        extras['dependencies'] = self.get_dependencies(func.__name__)81        if path is None:82            path = func.__name__83        self.router.add_api_route(f"{self.base_path}{path}", func, tags=[84                                  self.class_tag], methods=methods, **extras)85    @classmethod86    def extra_method(cls, methods: List[str] = ["GET"], path_key: str = None):87        """ if you want to add extra method to the viewset, you can use this decorator """88        def decorator(func):89            cls.marked_functions.append([func, methods, path_key])90            return func91        return decorator92    def find_marked_functions(self):93        for func in dir(self):94            for marked_func in self.marked_functions:95                if func == marked_func[0].__name__:96                    self.extra_functions.append(marked_func)97                    self.marked_functions.remove(marked_func)98                    break99        return self.extra_functions100class CrudViewSet(ViewSet):101    """102    This is the base viewset for CRUD operations.103    """104    crud: BaseCrudSet = None105    model: BaseModel = None106    async_db = False107    def __init__(self):108        assert self.crud is not None, "You must define crud model"109        assert self.model is not None, "You must define model"110        self._crud = self.crud()111        super().__init__()112    async def list(self) -> List[model]:113        if self.async_db:114            return await self._crud.list()115        return self._crud.list()116    async def retrieve(self, id: int) -> model:117        if self.async_db:118            return await self._crud.retrieve(id)119        return self._crud.retrieve(id)120    async def create(self, data: model) -> model:121        if self.async_db:122            return await self._crud.create(data)123        return self._crud.create(data)124    async def update(self, id: int, data: model) -> model:125        if self.async_db:126            return await self._crud.update(id, data)127        return self._crud.update(id, data)128    async def partial_update(self, id: int, data: model) -> model:129        if self.async_db:130            return await self._crud.partial_update(id, data)131        return self._crud.partial_update(id, data)132    async def destroy(self, id: int) -> model:133        if self.async_db:134            return await self._crud.destroy(id)...findCallChains.py
Source:findCallChains.py  
12import idaapi34# tcp_v4_rcv5#START_EA = 0xC1780B106START_EA = 0x000000000002A5E078# mark drop functions9#TARGET_EA = 0xC16FAB0010#TARGET_EA = 0x0807320411#TARGET_EA = 0x0801732012#TARGET_EA = 0x0801FF6713#TARGET_EA = 0x0801033F14#TARGET_EA = 0x0801AE28    # set state TCP_NEW_SYN_RECV15TARGET_EA = 0x0000000000017240 # tcp_drop1617entry_func = idaapi.get_func(START_EA)18entry_func_name = Name(entry_func.startEA) or "Entry"19marked_func = idaapi.get_func(TARGET_EA)20marked_func_name = Name(marked_func.startEA) or "MarkedFunc"2122# debug output23print("Entry Function Address: %x" % START_EA)24print("Entry Function Name: '%s'" % entry_func_name)25print("Marked Instruction Address: %x" % TARGET_EA)26print("Marked Instruction belongs to function '%s'" % marked_func_name)2728# There are 2 steps:29# 1. Find all possible call chains (no recursive, i.e. recursive level = 1)30# 2. Find all possible paths within each function in the call chain3132# 1. Find all possible call chains33# backwards or fowards? or both at the same time?3435all_call_chains = []3637def convert_call_stack_to_call_chain(call_stack, reverse=False):38    call_chain = []39    sorted_call_stack = sorted(call_stack.iteritems(), key=lambda (k,v): v['level'], reverse=reverse)4041    for k, v in sorted_call_stack:42        call_chain.append([k, v['start_ea'], v['call_site'], v['level']])4344    # quickfix45    if reverse:46        for i in range(len(call_chain)):47            call_chain[i][3] = len(call_chain) - 1 - call_chain[i][3] 48    else:49        for i in range(len(call_chain)-1):50            call_chain[i][2] = call_chain[i+1][2]51        call_chain[-1][2] = 05253    return call_chain545556# ~75s57def find_call_chain_forward(curr_func, target_func, call_stack, curr_level):58    global fd59    #fd.write("Current Func: %s\n" % (Name(curr_func.startEA) or "N/A"))60    if curr_func == target_func:61        all_call_chains.append(convert_call_stack_to_call_chain(call_stack))62        return6364    for i in FuncItems(curr_func.startEA):65        for xref in XrefsFrom(i, 0):66            #fd.write("%s calls %s from 0x%x. Type: %d" % (Name(curr_func.startEA), Name(xref.to), i, xref.type))67            #fd.write("\n")68            if xref.type == fl_CN or xref.type == fl_CF:69                callee = idaapi.get_func(xref.to)70                callee_name = Name(callee.startEA) or "N/A" 71                if callee_name not in call_stack:72                    curr_level += 173                    call_stack[callee_name] = {'level': curr_level, 'start_ea': callee.startEA, 'call_site': i}74                    find_call_chain_forward(callee, target_func, call_stack, curr_level)75                    del call_stack[callee_name]76                    curr_level -= 17778# ~25s79def find_call_chain_forward2(curr_func, target_func, call_stack, curr_level):80    global fd81    #fd.write("Current Func: %s\n" % (Name(curr_func.startEA) or "N/A"))82    if curr_func == target_func:83        all_call_chains.append(convert_call_stack_to_call_chain(call_stack))84        return8586    for i in FuncItems(curr_func.startEA):87        for ref in CodeRefsFrom(i, 0):88            callee = idaapi.get_func(ref)89            callee_name = Name(callee.startEA) or "N/A" 90            if callee_name not in call_stack:91                curr_level += 192                call_stack[callee_name] = {'level': curr_level, 'start_ea': callee.startEA, 'call_site': i}93                find_call_chain_forward2(callee, target_func, call_stack, curr_level)94                del call_stack[callee_name]95                curr_level -= 19697# ~1s98def find_call_chain_backward(curr_func, target_func, call_stack, curr_level):99    global fd100    if curr_func == target_func:101        all_call_chains.append(convert_call_stack_to_call_chain(call_stack, True))102        return103104    for xref in XrefsTo(curr_func.startEA, 0):105        if xref.type == fl_CN or xref.type == fl_CF:106            caller = idaapi.get_func(idc.GetFunctionAttr(xref.frm, idc.FUNCATTR_START))107            caller_name = Name(caller.startEA) or "N/A"108            if caller_name not in call_stack:109                curr_level += 1110                call_stack[caller_name] = {'level': curr_level, 'start_ea': caller.startEA, 'call_site': xref.frm}111                find_call_chain_backward(caller, target_func, call_stack, curr_level)112                del call_stack[caller_name]113                curr_level -= 1114115# ~1s116def find_call_chain_backward2(curr_func, target_func, call_stack, curr_level):117    global fd118    if curr_func == target_func:119        all_call_chains.append(convert_call_stack_to_call_chain(call_stack, True))120        return121122    for ref in CodeRefsTo(curr_func.startEA, 0):123        caller = idaapi.get_func(idc.GetFunctionAttr(ref, idc.FUNCATTR_START))124        if caller:125            caller_name = Name(caller.startEA) or "N/A"126            if caller_name not in call_stack:127                curr_level += 1128                call_stack[caller_name] = {'level': curr_level, 'start_ea': caller.startEA, 'call_site': ref}129                find_call_chain_backward2(caller, target_func, call_stack, curr_level)130                del call_stack[caller_name]131                curr_level -= 1132133134file = AskFile(1, "*.txt", "Select output file")135fd = open(file, 'w')136137# forwards138#find_call_chain_forward2(entry_func, marked_func, {entry_func_name: {'level': 0, 'start_ea': START_EA, 'call_site': 0}}, 0)139# backwards140find_call_chain_backward2(marked_func, entry_func, {marked_func_name: {'level': 0, 'start_ea': marked_func.startEA, 'call_site': TARGET_EA}}, 0)141142for cc in all_call_chains:143    func_name, start_ea, call_site, lvl = cc[0]144    fd.write("%s,%x,%x" % (func_name, start_ea, call_site))145    for func_name, start_ea, call_site, lvl in cc[1:]:146        fd.write("|%s,%x,%x" % (func_name, start_ea, call_site))147    fd.write("\n")148149#f = idaapi.FlowChart(get_func(idc.ScreenEA()))150151fd.close()
...test_function_marker.py
Source:test_function_marker.py  
...21    assert function_marker('bla').get_value(func, 1) == 122def test_marker_on_methods(marker):23    class Obj(object):24        @marker25        def marked_func(self):26            pass27        def unmarked_func(self):28            pass29        @classmethod30        def unmarked_classmethod(cls):31            pass32        @staticmethod33        def unmarked_staticmethod(param):34            pass35        @marker36        @classmethod37        def marked_classmethod(cls):38            pass39        @marker40        @staticmethod41        def marked_staticmethod():...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
