Best Python code snippet using autotest_python
servicemanager_pb2_grpc.py
Source:servicemanager_pb2_grpc.py  
1# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!2import grpc3from google.api import service_pb2 as google_dot_api_dot_service__pb24from google.api.servicemanagement.v1 import resources_pb2 as google_dot_api_dot_servicemanagement_dot_v1_dot_resources__pb25from google.api.servicemanagement.v1 import servicemanager_pb2 as google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb26from google.longrunning import operations_pb2 as google_dot_longrunning_dot_operations__pb27class ServiceManagerStub(object):8  """[Google Service Management API](/service-management/overview)9  """10  def __init__(self, channel):11    """Constructor.12    Args:13      channel: A grpc.Channel.14    """15    self.ListServices = channel.unary_unary(16        '/google.api.servicemanagement.v1.ServiceManager/ListServices',17        request_serializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.ListServicesRequest.SerializeToString,18        response_deserializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.ListServicesResponse.FromString,19        )20    self.GetService = channel.unary_unary(21        '/google.api.servicemanagement.v1.ServiceManager/GetService',22        request_serializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.GetServiceRequest.SerializeToString,23        response_deserializer=google_dot_api_dot_servicemanagement_dot_v1_dot_resources__pb2.ManagedService.FromString,24        )25    self.CreateService = channel.unary_unary(26        '/google.api.servicemanagement.v1.ServiceManager/CreateService',27        request_serializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.CreateServiceRequest.SerializeToString,28        response_deserializer=google_dot_longrunning_dot_operations__pb2.Operation.FromString,29        )30    self.DeleteService = channel.unary_unary(31        '/google.api.servicemanagement.v1.ServiceManager/DeleteService',32        request_serializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.DeleteServiceRequest.SerializeToString,33        response_deserializer=google_dot_longrunning_dot_operations__pb2.Operation.FromString,34        )35    self.UndeleteService = channel.unary_unary(36        '/google.api.servicemanagement.v1.ServiceManager/UndeleteService',37        request_serializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.UndeleteServiceRequest.SerializeToString,38        response_deserializer=google_dot_longrunning_dot_operations__pb2.Operation.FromString,39        )40    self.ListServiceConfigs = channel.unary_unary(41        '/google.api.servicemanagement.v1.ServiceManager/ListServiceConfigs',42        request_serializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.ListServiceConfigsRequest.SerializeToString,43        response_deserializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.ListServiceConfigsResponse.FromString,44        )45    self.GetServiceConfig = channel.unary_unary(46        '/google.api.servicemanagement.v1.ServiceManager/GetServiceConfig',47        request_serializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.GetServiceConfigRequest.SerializeToString,48        response_deserializer=google_dot_api_dot_service__pb2.Service.FromString,49        )50    self.CreateServiceConfig = channel.unary_unary(51        '/google.api.servicemanagement.v1.ServiceManager/CreateServiceConfig',52        request_serializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.CreateServiceConfigRequest.SerializeToString,53        response_deserializer=google_dot_api_dot_service__pb2.Service.FromString,54        )55    self.SubmitConfigSource = channel.unary_unary(56        '/google.api.servicemanagement.v1.ServiceManager/SubmitConfigSource',57        request_serializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.SubmitConfigSourceRequest.SerializeToString,58        response_deserializer=google_dot_longrunning_dot_operations__pb2.Operation.FromString,59        )60    self.ListServiceRollouts = channel.unary_unary(61        '/google.api.servicemanagement.v1.ServiceManager/ListServiceRollouts',62        request_serializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.ListServiceRolloutsRequest.SerializeToString,63        response_deserializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.ListServiceRolloutsResponse.FromString,64        )65    self.GetServiceRollout = channel.unary_unary(66        '/google.api.servicemanagement.v1.ServiceManager/GetServiceRollout',67        request_serializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.GetServiceRolloutRequest.SerializeToString,68        response_deserializer=google_dot_api_dot_servicemanagement_dot_v1_dot_resources__pb2.Rollout.FromString,69        )70    self.CreateServiceRollout = channel.unary_unary(71        '/google.api.servicemanagement.v1.ServiceManager/CreateServiceRollout',72        request_serializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.CreateServiceRolloutRequest.SerializeToString,73        response_deserializer=google_dot_longrunning_dot_operations__pb2.Operation.FromString,74        )75    self.GenerateConfigReport = channel.unary_unary(76        '/google.api.servicemanagement.v1.ServiceManager/GenerateConfigReport',77        request_serializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.GenerateConfigReportRequest.SerializeToString,78        response_deserializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.GenerateConfigReportResponse.FromString,79        )80    self.EnableService = channel.unary_unary(81        '/google.api.servicemanagement.v1.ServiceManager/EnableService',82        request_serializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.EnableServiceRequest.SerializeToString,83        response_deserializer=google_dot_longrunning_dot_operations__pb2.Operation.FromString,84        )85    self.DisableService = channel.unary_unary(86        '/google.api.servicemanagement.v1.ServiceManager/DisableService',87        request_serializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.DisableServiceRequest.SerializeToString,88        response_deserializer=google_dot_longrunning_dot_operations__pb2.Operation.FromString,89        )90class ServiceManagerServicer(object):91  """[Google Service Management API](/service-management/overview)92  """93  def ListServices(self, request, context):94    """Lists managed services.95    Returns all public services. For authenticated users, also returns all96    services the calling user has "servicemanagement.services.get" permission97    for.98    **BETA:** If the caller specifies the `consumer_id`, it returns only the99    services enabled on the consumer. The `consumer_id` must have the format100    of "project:{PROJECT-ID}".101    """102    context.set_code(grpc.StatusCode.UNIMPLEMENTED)103    context.set_details('Method not implemented!')104    raise NotImplementedError('Method not implemented!')105  def GetService(self, request, context):106    """Gets a managed service. Authentication is required unless the service is107    public.108    """109    context.set_code(grpc.StatusCode.UNIMPLEMENTED)110    context.set_details('Method not implemented!')111    raise NotImplementedError('Method not implemented!')112  def CreateService(self, request, context):113    """Creates a new managed service.114    Please note one producer project can own no more than 20 services.115    Operation<response: ManagedService>116    """117    context.set_code(grpc.StatusCode.UNIMPLEMENTED)118    context.set_details('Method not implemented!')119    raise NotImplementedError('Method not implemented!')120  def DeleteService(self, request, context):121    """Deletes a managed service. This method will change the service to the122    `Soft-Delete` state for 30 days. Within this period, service producers may123    call [UndeleteService][google.api.servicemanagement.v1.ServiceManager.UndeleteService] to restore the service.124    After 30 days, the service will be permanently deleted.125    Operation<response: google.protobuf.Empty>126    """127    context.set_code(grpc.StatusCode.UNIMPLEMENTED)128    context.set_details('Method not implemented!')129    raise NotImplementedError('Method not implemented!')130  def UndeleteService(self, request, context):131    """Revives a previously deleted managed service. The method restores the132    service using the configuration at the time the service was deleted.133    The target service must exist and must have been deleted within the134    last 30 days.135    Operation<response: UndeleteServiceResponse>136    """137    context.set_code(grpc.StatusCode.UNIMPLEMENTED)138    context.set_details('Method not implemented!')139    raise NotImplementedError('Method not implemented!')140  def ListServiceConfigs(self, request, context):141    """Lists the history of the service configuration for a managed service,142    from the newest to the oldest.143    """144    context.set_code(grpc.StatusCode.UNIMPLEMENTED)145    context.set_details('Method not implemented!')146    raise NotImplementedError('Method not implemented!')147  def GetServiceConfig(self, request, context):148    """Gets a service configuration (version) for a managed service.149    """150    context.set_code(grpc.StatusCode.UNIMPLEMENTED)151    context.set_details('Method not implemented!')152    raise NotImplementedError('Method not implemented!')153  def CreateServiceConfig(self, request, context):154    """Creates a new service configuration (version) for a managed service.155    This method only stores the service configuration. To roll out the service156    configuration to backend systems please call157    [CreateServiceRollout][google.api.servicemanagement.v1.ServiceManager.CreateServiceRollout].158    """159    context.set_code(grpc.StatusCode.UNIMPLEMENTED)160    context.set_details('Method not implemented!')161    raise NotImplementedError('Method not implemented!')162  def SubmitConfigSource(self, request, context):163    """Creates a new service configuration (version) for a managed service based164    on165    user-supplied configuration source files (for example: OpenAPI166    Specification). This method stores the source configurations as well as the167    generated service configuration. To rollout the service configuration to168    other services,169    please call [CreateServiceRollout][google.api.servicemanagement.v1.ServiceManager.CreateServiceRollout].170    Operation<response: SubmitConfigSourceResponse>171    """172    context.set_code(grpc.StatusCode.UNIMPLEMENTED)173    context.set_details('Method not implemented!')174    raise NotImplementedError('Method not implemented!')175  def ListServiceRollouts(self, request, context):176    """Lists the history of the service configuration rollouts for a managed177    service, from the newest to the oldest.178    """179    context.set_code(grpc.StatusCode.UNIMPLEMENTED)180    context.set_details('Method not implemented!')181    raise NotImplementedError('Method not implemented!')182  def GetServiceRollout(self, request, context):183    """Gets a service configuration [rollout][google.api.servicemanagement.v1.Rollout].184    """185    context.set_code(grpc.StatusCode.UNIMPLEMENTED)186    context.set_details('Method not implemented!')187    raise NotImplementedError('Method not implemented!')188  def CreateServiceRollout(self, request, context):189    """Creates a new service configuration rollout. Based on rollout, the190    Google Service Management will roll out the service configurations to191    different backend services. For example, the logging configuration will be192    pushed to Google Cloud Logging.193    Please note that any previous pending and running Rollouts and associated194    Operations will be automatically cancelled so that the latest Rollout will195    not be blocked by previous Rollouts.196    Operation<response: Rollout>197    """198    context.set_code(grpc.StatusCode.UNIMPLEMENTED)199    context.set_details('Method not implemented!')200    raise NotImplementedError('Method not implemented!')201  def GenerateConfigReport(self, request, context):202    """Generates and returns a report (errors, warnings and changes from203    existing configurations) associated with204    GenerateConfigReportRequest.new_value205    If GenerateConfigReportRequest.old_value is specified,206    GenerateConfigReportRequest will contain a single ChangeReport based on the207    comparison between GenerateConfigReportRequest.new_value and208    GenerateConfigReportRequest.old_value.209    If GenerateConfigReportRequest.old_value is not specified, this method210    will compare GenerateConfigReportRequest.new_value with the last pushed211    service configuration.212    """213    context.set_code(grpc.StatusCode.UNIMPLEMENTED)214    context.set_details('Method not implemented!')215    raise NotImplementedError('Method not implemented!')216  def EnableService(self, request, context):217    """Enables a [service][google.api.servicemanagement.v1.ManagedService] for a project, so it can be used218    for the project. See219    [Cloud Auth Guide](https://cloud.google.com/docs/authentication) for220    more information.221    Operation<response: EnableServiceResponse>222    """223    context.set_code(grpc.StatusCode.UNIMPLEMENTED)224    context.set_details('Method not implemented!')225    raise NotImplementedError('Method not implemented!')226  def DisableService(self, request, context):227    """Disables a [service][google.api.servicemanagement.v1.ManagedService] for a project, so it can no longer be228    be used for the project. It prevents accidental usage that may cause229    unexpected billing charges or security leaks.230    Operation<response: DisableServiceResponse>231    """232    context.set_code(grpc.StatusCode.UNIMPLEMENTED)233    context.set_details('Method not implemented!')234    raise NotImplementedError('Method not implemented!')235def add_ServiceManagerServicer_to_server(servicer, server):236  rpc_method_handlers = {237      'ListServices': grpc.unary_unary_rpc_method_handler(238          servicer.ListServices,239          request_deserializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.ListServicesRequest.FromString,240          response_serializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.ListServicesResponse.SerializeToString,241      ),242      'GetService': grpc.unary_unary_rpc_method_handler(243          servicer.GetService,244          request_deserializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.GetServiceRequest.FromString,245          response_serializer=google_dot_api_dot_servicemanagement_dot_v1_dot_resources__pb2.ManagedService.SerializeToString,246      ),247      'CreateService': grpc.unary_unary_rpc_method_handler(248          servicer.CreateService,249          request_deserializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.CreateServiceRequest.FromString,250          response_serializer=google_dot_longrunning_dot_operations__pb2.Operation.SerializeToString,251      ),252      'DeleteService': grpc.unary_unary_rpc_method_handler(253          servicer.DeleteService,254          request_deserializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.DeleteServiceRequest.FromString,255          response_serializer=google_dot_longrunning_dot_operations__pb2.Operation.SerializeToString,256      ),257      'UndeleteService': grpc.unary_unary_rpc_method_handler(258          servicer.UndeleteService,259          request_deserializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.UndeleteServiceRequest.FromString,260          response_serializer=google_dot_longrunning_dot_operations__pb2.Operation.SerializeToString,261      ),262      'ListServiceConfigs': grpc.unary_unary_rpc_method_handler(263          servicer.ListServiceConfigs,264          request_deserializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.ListServiceConfigsRequest.FromString,265          response_serializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.ListServiceConfigsResponse.SerializeToString,266      ),267      'GetServiceConfig': grpc.unary_unary_rpc_method_handler(268          servicer.GetServiceConfig,269          request_deserializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.GetServiceConfigRequest.FromString,270          response_serializer=google_dot_api_dot_service__pb2.Service.SerializeToString,271      ),272      'CreateServiceConfig': grpc.unary_unary_rpc_method_handler(273          servicer.CreateServiceConfig,274          request_deserializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.CreateServiceConfigRequest.FromString,275          response_serializer=google_dot_api_dot_service__pb2.Service.SerializeToString,276      ),277      'SubmitConfigSource': grpc.unary_unary_rpc_method_handler(278          servicer.SubmitConfigSource,279          request_deserializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.SubmitConfigSourceRequest.FromString,280          response_serializer=google_dot_longrunning_dot_operations__pb2.Operation.SerializeToString,281      ),282      'ListServiceRollouts': grpc.unary_unary_rpc_method_handler(283          servicer.ListServiceRollouts,284          request_deserializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.ListServiceRolloutsRequest.FromString,285          response_serializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.ListServiceRolloutsResponse.SerializeToString,286      ),287      'GetServiceRollout': grpc.unary_unary_rpc_method_handler(288          servicer.GetServiceRollout,289          request_deserializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.GetServiceRolloutRequest.FromString,290          response_serializer=google_dot_api_dot_servicemanagement_dot_v1_dot_resources__pb2.Rollout.SerializeToString,291      ),292      'CreateServiceRollout': grpc.unary_unary_rpc_method_handler(293          servicer.CreateServiceRollout,294          request_deserializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.CreateServiceRolloutRequest.FromString,295          response_serializer=google_dot_longrunning_dot_operations__pb2.Operation.SerializeToString,296      ),297      'GenerateConfigReport': grpc.unary_unary_rpc_method_handler(298          servicer.GenerateConfigReport,299          request_deserializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.GenerateConfigReportRequest.FromString,300          response_serializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.GenerateConfigReportResponse.SerializeToString,301      ),302      'EnableService': grpc.unary_unary_rpc_method_handler(303          servicer.EnableService,304          request_deserializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.EnableServiceRequest.FromString,305          response_serializer=google_dot_longrunning_dot_operations__pb2.Operation.SerializeToString,306      ),307      'DisableService': grpc.unary_unary_rpc_method_handler(308          servicer.DisableService,309          request_deserializer=google_dot_api_dot_servicemanagement_dot_v1_dot_servicemanager__pb2.DisableServiceRequest.FromString,310          response_serializer=google_dot_longrunning_dot_operations__pb2.Operation.SerializeToString,311      ),312  }313  generic_handler = grpc.method_handlers_generic_handler(314      'google.api.servicemanagement.v1.ServiceManager', rpc_method_handlers)...windows-service.py
Source:windows-service.py  
1import eventlet2eventlet.monkey_patch()3import tempfile4import os5import sys6import time7log_location = os.path.join(tempfile.gettempdir(), "pricingmonkey.log")8sys.stdout = sys.stderr = open(log_location, "a")9import win32serviceutil10import win32service11import win32event12import servicemanager13class BloombergBridgeService(win32serviceutil.ServiceFramework):14    _svc_name_ = "BBApi"15    _svc_display_name_ = "Web API for Bloomberg Market Data"16    def __init__(self, args):17        try:18            win32serviceutil.ServiceFramework.__init__(self,args)19            servicemanager.LogInfoMsg("Logging to: " + log_location)20            print("Initialising...")21            servicemanager.LogInfoMsg("Initialising...")22            self.hWaitStop = win32event.CreateEvent(None,0,0,None)23            self.isAlive = True24        except Exception as e:25            print("Error: " + str(e))26            servicemanager.LogErrorMsg("Error: " + str(e))27    def SvcStop(self):28        try:29            print("Stopping...")30            servicemanager.LogInfoMsg("Stopping...")31            self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)32            win32event.SetEvent(self.hWaitStop)33            self.isAlive = False34        except Exception as e:35            print("Error: " + str(e))36            servicemanager.LogErrorMsg("Error: " + str(e))37    def SvcDoRun(self):38        try:39            print("Starting...")40            servicemanager.LogInfoMsg("Starting...")41            servicemanager.LogMsg(servicemanager.EVENTLOG_INFORMATION_TYPE,42                                  servicemanager.PYS_SERVICE_STARTED,43                                  (self._svc_name_,''))44            self.main()45        except Exception as e:46            print("Error: " + str(e))47            servicemanager.LogErrorMsg("Error: " + str(e))48    def main(self):49        from server import main as start_server50        eventlet.spawn(start_server)51        print("Started")52        servicemanager.LogInfoMsg("Started")53        while self.isAlive:54            time.sleep(5)55if __name__ == '__main__':...recipe-286220.py
Source:recipe-286220.py  
1"""2From "Python Programming on Win32 Chapter 18 - Windows NT Services".3The Event Log facilities of servicemanager use the name of the executable as the application name. 4This means that by default, the application name will be PythonService. 5The only way this can be changed is to take a copy of PythonService.exe 6and rename it to something of your liking (it's only around 20 KB!). 7Alternatively, you may wish to use the Event Log natively, as described later in this chapter.8"""9# Here is another way of changing the source name:10class PyMyService(win32serviceutil.ServiceFramework):11    _svc_name_ = "PyMyService"12    _svc_display_name_ = "Python Service With The Right Name"13    14    def __init__(self, args):15        win32serviceutil.ServiceFramework.__init__(self, args)16    def SvcDoRun(self):17        import servicemanager18        # Add these two lines and the source will be "PyMyService"19        win32evtlogutil.AddSourceToRegistry(self._svc_name_, servicemanager.__file__)20        servicemanager.Initialize(self._svc_name_, servicemanager.__file__)21        #22        servicemanager.LogMsg(23               servicemanager.EVENTLOG_INFORMATION_TYPE,24               servicemanager.PYS_SERVICE_STARTED,25               (self._svc_name_, ""))26	               27        servicemanager.LogInfoMsg("Info")  # Event is 25528        servicemanager.LogErrorMsg("Error")  # Event is 25529        servicemanager.LogWarningMsg("Warn")  # Event is 25530        # or31        servicemanager.LogMsg(servicemanager.EVENTLOG_INFORMATION_TYPE,32               0xF000, ("Info",))   	# Event is 6144033        servicemanager.LogMsg(servicemanager.EVENTLOG_ERROR_TYPE, 34               0xF001, ("Error",))   	# Event is 6144135        servicemanager.LogMsg(servicemanager.EVENTLOG_WARNING_TYPE, 36               0xF002, ("Warn",))   	# Event is 61442...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
