How to use create_dispatch_table method in localstack

Best Python code snippet using localstack_python

skeleton.py

Source:skeleton.py Github

copy

Full Screen

...18DispatchTable = Dict[str, ServiceRequestHandler]19def create_skeleton(service: Union[str, ServiceModel], delegate: Any):20 if isinstance(service, str):21 service = load_service(service)22 return Skeleton(service, create_dispatch_table(delegate))23class HandlerAttributes(NamedTuple):24 """25 Holder object of the attributes added to a function by the @handler decorator.26 """27 function_name: str28 operation: str29 pass_context: bool30 expand_parameters: bool31def create_dispatch_table(delegate: object) -> DispatchTable:32 """33 Creates a dispatch table for a given object. First, the entire class tree of the object is scanned to find any34 functions that are decorated with @handler. It then resolve those functions on the delegate.35 """36 # scan class tree for @handler wrapped functions (reverse class tree so that inherited functions overwrite parent37 # functions)38 cls_tree = inspect.getmro(delegate.__class__)39 handlers: Dict[str, HandlerAttributes] = dict()40 cls_tree = reversed(list(cls_tree))41 for cls in cls_tree:42 if cls == object:43 continue44 for name, fn in inspect.getmembers(cls, inspect.isfunction):45 try:46 # attributes come from operation_marker in @handler wrapper47 handlers[fn.operation] = HandlerAttributes(48 fn.__name__, fn.operation, fn.pass_context, fn.expand_parameters49 )50 except AttributeError:51 pass52 # create dispatch table from operation handlers by resolving bound functions on the delegate53 dispatch_table: DispatchTable = dict()54 for handler in handlers.values():55 # resolve the bound function of the delegate56 bound_function = getattr(delegate, handler.function_name)57 # create a dispatcher58 dispatch_table[handler.operation] = ServiceRequestDispatcher(59 bound_function,60 operation=handler.operation,61 pass_context=handler.pass_context,62 expand_parameters=handler.expand_parameters,63 )64 return dispatch_table65class ServiceRequestDispatcher:66 fn: Callable67 operation: str68 expand_parameters: bool = True69 pass_context: bool = True70 def __init__(71 self,72 fn: Callable,73 operation: str,74 pass_context: bool = True,75 expand_parameters: bool = True,76 ):77 self.fn = fn78 self.operation = operation79 self.pass_context = pass_context80 self.expand_parameters = expand_parameters81 def __call__(82 self, context: RequestContext, request: ServiceRequest83 ) -> Optional[ServiceResponse]:84 args = []85 kwargs = {}86 if not self.expand_parameters:87 if self.pass_context:88 args.append(context)89 args.append(request)90 else:91 if request is None:92 kwargs = {}93 else:94 kwargs = {xform_name(k): v for k, v in request.items()}95 kwargs["context"] = context96 return self.fn(*args, **kwargs)97class Skeleton:98 service: ServiceModel99 dispatch_table: DispatchTable100 def __init__(self, service: ServiceModel, implementation: Union[Any, DispatchTable]):101 self.service = service102 self.parser = create_parser(service)103 self.serializer = create_serializer(service)104 if isinstance(implementation, dict):105 self.dispatch_table = implementation106 else:107 self.dispatch_table = create_dispatch_table(implementation)108 def invoke(self, context: RequestContext) -> HttpResponse:109 if context.operation and context.service_request:110 # if the parsed request is already set in the context, re-use them111 operation, instance = context.operation, context.service_request112 else:113 # otherwise parse the incoming HTTPRequest114 operation, instance = self.parser.parse(context.request)115 context.operation = operation116 serializer = self.serializer117 try:118 # Find the operation's handler in the dispatch table119 if operation.name not in self.dispatch_table:120 LOG.warning(121 "missing entry in dispatch table for %s.%s",...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful