How to use stub_class method in autotest

Best Python code snippet using autotest_python Github


Full Screen

1#2# Licensed to the Apache Software Foundation (ASF) under one3# or more contributor license agreements. See the NOTICE file4# distributed with this work for additional information5# regarding copyright ownership. The ASF licenses this file6# to you under the Apache License, Version 2.0 (the7# "License"); you may not use this file except in compliance8# with the License. You may obtain a copy of the License at9#10# Unless required by applicable law or agreed to in writing,13# software distributed under the License is distributed on an14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY15# KIND, either express or implied. See the License for the16# specific language governing permissions and limitations17# under the License.18from typing import Any, Callable, Dict, List, Optional19from airflow.models import BaseOperator20from airflow.providers.grpc.hooks.grpc import GrpcHook21class GrpcOperator(BaseOperator):22 """23 Calls a gRPC endpoint to execute an action24 :param stub_class: The stub client to use for this gRPC call25 :type stub_class: gRPC stub class generated from proto file26 :param call_func: The client function name to call the gRPC endpoint27 :type call_func: gRPC client function name for the endpoint generated from proto file, str28 :param grpc_conn_id: The connection to run the operator against29 :type grpc_conn_id: str30 :param data: The data to pass to the rpc call31 :type data: A dict with key value pairs as kwargs of the call_func32 :param interceptors: A list of gRPC interceptor objects to be used on the channel33 :type interceptors: A list of gRPC interceptor objects, has to be initialized34 :param custom_connection_func: The customized connection function to return channel object35 :type custom_connection_func: A python function that returns channel object, take in36 a connection object, can be a partial function37 :param streaming: A flag to indicate if the call is a streaming call38 :type streaming: boolean39 :param response_callback: The callback function to process the response from gRPC call40 :type response_callback: A python function that process the response from gRPC call,41 takes in response object and context object, context object can be used to perform42 push xcom or other after task actions43 :param log_response: A flag to indicate if we need to log the response44 :type log_response: boolean45 """46 template_fields = ('stub_class', 'call_func', 'data')47 template_fields_renderers = {"data": "py"}48 def __init__(49 self,50 *,51 stub_class: Callable,52 call_func: str,53 grpc_conn_id: str = "grpc_default",54 data: Optional[dict] = None,55 interceptors: Optional[List[Callable]] = None,56 custom_connection_func: Optional[Callable] = None,57 streaming: bool = False,58 response_callback: Optional[Callable] = None,59 log_response: bool = False,60 **kwargs,61 ) -> None:62 super().__init__(**kwargs)63 self.stub_class = stub_class64 self.call_func = call_func65 self.grpc_conn_id = grpc_conn_id66 = data or {}67 self.interceptors = interceptors68 self.custom_connection_func = custom_connection_func69 self.streaming = streaming70 self.log_response = log_response71 self.response_callback = response_callback72 def _get_grpc_hook(self) -> GrpcHook:73 return GrpcHook(74 self.grpc_conn_id,75 interceptors=self.interceptors,76 custom_connection_func=self.custom_connection_func,77 )78 def execute(self, context: Dict) -> None:79 hook = self._get_grpc_hook()80"Calling gRPC service")81 # grpc hook always yield82 responses =, self.call_func, streaming=self.streaming, for response in responses:84 self._handle_response(response, context)85 def _handle_response(self, response: Any, context: Dict) -> None:86 if self.log_response:87 if self.response_callback:...

Full Screen

Full Screen


Source:__init__.pyi Github


Full Screen

1"""2This type stub file was generated by pyright.3"""4import calendar5import datetime6import http.client7import os8import re9import google.auth10import google.auth.transport.requests11from __future__ import absolute_import12from threading import local as Local13from typing import Union14from google.protobuf import duration_pb2, timestamp_pb215"""Shared helpers for Google Cloud packages.16This module is not part of the public API surface.17"""18_NOW = ...19UTC = ...20_EPOCH = ...21_RFC3339_MICROS = ...22_RFC3339_NO_FRACTION = ...23_TIMEONLY_W_MICROS = ...24_TIMEONLY_NO_FRACTION = ...25_RFC3339_NANOS = ...26_USER_ROOT: Union[str, None]27_GCLOUD_CONFIG_FILE = ...28_GCLOUD_CONFIG_SECTION = ...29_GCLOUD_CONFIG_KEY = ...30class _LocalStack(Local):31 """Manage a thread-local LIFO stack of resources.32 Intended for use in :class:``,33 :class:``, etc.34 """35 def __init__(self) -> None: ...36 def __iter__(self):37 """Iterate the stack in LIFO order."""38 ...39 def push(self, resource):40 """Push a resource onto our stack."""41 ...42 def pop(self):43 """Pop a resource from our stack.44 :rtype: object45 :returns: the top-most resource, after removing it.46 :raises IndexError: if the stack is empty.47 """48 ...49 @property50 def top(self):51 """Get the top-most resource52 :rtype: object53 :returns: the top-most item, or None if the stack is empty.54 """55 ...56def make_secure_channel(credentials, user_agent, host, extra_options=...):57 """Makes a secure channel for an RPC service.58 Uses / depends on gRPC.59 :type credentials: :class:`google.auth.credentials.Credentials`60 :param credentials: The OAuth2 Credentials to use for creating61 access tokens.62 :type user_agent: str63 :param user_agent: The user agent to be used with API requests.64 :type host: str65 :param host: The host for the service.66 :type extra_options: tuple67 :param extra_options: (Optional) Extra gRPC options used when creating the68 channel.69 :rtype: :class:`grpc._channel.Channel`70 :returns: gRPC secure channel with credentials attached.71 """72 ...73def make_secure_stub(credentials, user_agent, stub_class, host, extra_options=...):74 """Makes a secure stub for an RPC service.75 Uses / depends on gRPC.76 :type credentials: :class:`google.auth.credentials.Credentials`77 :param credentials: The OAuth2 Credentials to use for creating78 access tokens.79 :type user_agent: str80 :param user_agent: The user agent to be used with API requests.81 :type stub_class: type82 :param stub_class: A gRPC stub type for a given service.83 :type host: str84 :param host: The host for the service.85 :type extra_options: tuple86 :param extra_options: (Optional) Extra gRPC options passed when creating87 the channel.88 :rtype: object, instance of ``stub_class``89 :returns: The stub object used to make gRPC requests to a given API.90 """91 ...92def make_insecure_stub(stub_class, host, port=...):93 """Makes an insecure stub for an RPC service.94 Uses / depends on gRPC.95 :type stub_class: type96 :param stub_class: A gRPC stub type for a given service.97 :type host: str98 :param host: The host for the service. May also include the port99 if ``port`` is unspecified.100 :type port: int101 :param port: (Optional) The port for the service.102 :rtype: object, instance of ``stub_class``103 :returns: The stub object used to make gRPC requests to a given API.104 """...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?