How to use _worker_setup method in Molotov

Best Python code snippet using molotov_python

test_fmwk.py

Source:test_fmwk.py Github

copy

Full Screen

...272 self.assertEqual(results["FAILED"], 0)273 @dedicatedloop274 def test_setup_exception(self):275 @setup()276 async def _worker_setup(num, args):277 raise Exception("bleh")278 @scenario(weight=100)279 async def test_two(session):280 os.kill(os.getpid(), signal.SIGTERM)281 args = self.get_args()282 results = Runner(args)()283 self.assertEqual(results["OK"], 0)284 self.assertEqual(results["SETUP_FAILED"], 1)285 @dedicatedloop286 def test_global_setup_exception(self):287 @global_setup()288 def _setup(args):289 raise Exception("bleh")290 @scenario(weight=100)291 async def test_two(session):292 os.kill(os.getpid(), signal.SIGTERM)293 args = self.get_args()294 runner = Runner(args)295 self.assertRaises(Exception, runner)296 @dedicatedloop297 def test_teardown_exception(self):298 @teardown()299 def _teardown(args):300 raise Exception("bleh")301 @scenario(weight=100)302 async def test_two(session):303 os.kill(os.getpid(), signal.SIGTERM)304 args = self.get_args()305 results = Runner(args)()306 self.assertEqual(results["FAILED"], 0)307 @dedicatedloop308 def test_setup_not_dict(self):309 @setup()310 async def _worker_setup(num, args):311 return 1312 @scenario(weight=100)313 async def test_two(session):314 os.kill(os.getpid(), signal.SIGTERM)315 args = self.get_args()316 results = Runner(args)()...

Full Screen

Full Screen

entrypoints.py

Source:entrypoints.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2"""3This module patches the Nameko ServiceContainer so that every entrypoint that fires4generates a span with helpful defaults.5The kind, name, attributes and status of the span are determined by the6EntrypointAdapter class. More specialised versions can be provided by passing an7appropriate dictionary as `entrypoint_adapters` when invoking8`NamekoInstrumentor.instrument()`.9For example:10 entrypoint_to_adapter_map = {11 "my.custom.EntrypointType": "my.custom.EntrypointAdapter"12 }13 instrumentor = NamekoInstrumentor()14 instrumentor.instrument(entrypoint_adapters=entrypoint_to_adapter_map)15"""16import inspect17import socket18import warnings19from collections import defaultdict20from functools import partial21from traceback import format_exception22from weakref import WeakKeyDictionary23import nameko.containers24from nameko.utils import get_redacted_args25from opentelemetry import context, trace26from opentelemetry.instrumentation.utils import unwrap27from opentelemetry.propagate import extract28from opentelemetry.trace.status import Status, StatusCode29from opentelemetry.util._time import _time_ns30from wrapt import wrap_function_wrapper31from nameko_opentelemetry import utils32from nameko_opentelemetry.scrubbers import scrub33DEFAULT_ADAPTERS = {34 "nameko.rpc.Rpc": ("nameko_opentelemetry.rpc.RpcEntrypointAdapter"),35 "nameko.web.handlers.HttpRequestHandler": (36 "nameko_opentelemetry.http.HttpEntrypointAdapter"37 ),38 "nameko.events.EventHandler": (39 "nameko_opentelemetry.events.EventHandlerEntrypointAdapter"40 ),41 "nameko.messaging.Consumer": (42 "nameko_opentelemetry.messaging.ConsumerEntrypointAdapter"43 ),44 "nameko.timer.Timer": ("nameko_opentelemetry.timer.TimerEntrypointAdapter"),45}46active_spans = WeakKeyDictionary()47adapter_types = defaultdict(lambda: EntrypointAdapter)48class EntrypointAdapter:49 """ Default entrypoint adapter. This implementation is used unless there's50 a more specific adapter set for the firing entrypoint's type.51 """52 span_kind = trace.SpanKind.SERVER53 def __init__(self, config):54 self.config = config55 def get_span_name(self, worker_ctx):56 return f"{worker_ctx.service_name}.{worker_ctx.entrypoint.method_name}"57 def get_metadata(self, worker_ctx):58 return worker_ctx.context_data59 def exception_was_expected(self, worker_ctx, exc):60 expected_exceptions = getattr(61 worker_ctx.entrypoint, "expected_exceptions", None62 )63 expected_exceptions = expected_exceptions or tuple()64 return isinstance(exc, expected_exceptions)65 def start_span(self, span, worker_ctx):66 if span.is_recording():67 span.set_attributes(self.get_attributes(worker_ctx))68 def end_span(self, span, worker_ctx, result, exc_info):69 if span.is_recording():70 if exc_info:71 span.record_exception(72 exc_info[1],73 escaped=True,74 attributes=self.get_exception_attributes(worker_ctx, exc_info),75 )76 else:77 span.set_attributes(78 self.get_result_attributes(worker_ctx, result) or {}79 )80 status = self.get_status(worker_ctx, result, exc_info)81 span.set_status(status)82 def get_attributes(self, worker_ctx):83 """ Common attributes for most entrypoints, and hooks into subclassable84 implementations to fetch optional attributes.85 """86 entrypoint = worker_ctx.entrypoint87 attributes = {88 "service_name": worker_ctx.service_name,89 "entrypoint_type": type(entrypoint).__name__,90 "method_name": entrypoint.method_name,91 "active_workers": worker_ctx.container._worker_pool.running(),92 "available_workers": worker_ctx.container._worker_pool.free(),93 }94 attributes.update(self.get_header_attributes(worker_ctx) or {})95 if getattr(entrypoint, "sensitive_arguments", None):96 call_args = get_redacted_args(97 entrypoint, *worker_ctx.args, **worker_ctx.kwargs98 )99 redacted = True100 else:101 method = getattr(entrypoint.container.service_cls, entrypoint.method_name)102 call_args = inspect.getcallargs(103 method, None, *worker_ctx.args, **worker_ctx.kwargs104 )105 del call_args["self"]106 redacted = False107 attributes.update(108 self.get_call_args_attributes(worker_ctx, call_args, redacted) or {}109 )110 return attributes111 def get_call_args_attributes(self, worker_ctx, call_args, redacted):112 """ ...113 """114 if self.config.get("send_request_payloads"):115 call_args, truncated = utils.truncate(116 utils.serialise_to_string(scrub(call_args, self.config)),117 max_len=self.config.get("truncate_max_length"),118 )119 return {120 "call_args": call_args,121 "call_args_truncated": str(truncated),122 "call_args_redacted": str(redacted),123 }124 def get_header_attributes(self, worker_ctx):125 """ ...126 """127 if self.config.get("send_headers"):128 return {129 "context_data": utils.serialise_to_string(130 scrub(worker_ctx.data, self.config)131 )132 }133 def get_exception_attributes(self, worker_ctx, exc_info):134 """ Additional attributes to save alongside a worker exception.135 """136 exc_type, exc, _ = exc_info137 try:138 stacktrace = "\n".join(format_exception(*exc_info))139 except Exception:140 stacktrace = "Exception occurred on stacktrace formatting"141 return {142 "exception.stacktrace": stacktrace,143 "exception.expected": str(self.exception_was_expected(worker_ctx, exc)),144 }145 def get_result_attributes(self, worker_ctx, result):146 """ Attributes describing the entrypoint method result.147 """148 if self.config.get("send_response_payloads"):149 return {150 "result": utils.serialise_to_string(scrub(result or "", self.config))151 }152 def get_status(self, worker_ctx, result, exc_info):153 """ Span status for this worker.154 """155 if exc_info:156 exc_type, exc, _ = exc_info157 if not self.exception_was_expected(worker_ctx, exc):158 return Status(159 StatusCode.ERROR,160 description="{}: {}".format(type(exc).__name__, exc),161 )162 return Status(StatusCode.OK)163def adapter_factory(worker_ctx, config):164 adapter_class = adapter_types[type(worker_ctx.entrypoint)]165 return adapter_class(config)166def worker_setup(tracer, config, wrapped, instance, args, kwargs):167 """ Wrap nameko.containers.ServiceContainer._worker_setup.168 Creates a new span for each entrypoint that fires. The name of the169 span and its attributes are determined by the entrypoint "adapter"170 that is configured for that entrypoint, or the default implementation.171 """172 (worker_ctx,) = args173 adapter = adapter_factory(worker_ctx, config)174 ctx = extract(adapter.get_metadata(worker_ctx))175 token = context.attach(ctx)176 span = tracer.start_span(177 adapter.get_span_name(worker_ctx),178 kind=adapter.span_kind,179 attributes={"hostname": socket.gethostname()},180 start_time=_time_ns(),181 )182 # don't automatically record the exception or set status, because183 # we do that in the entrypoint adapter's `end_span` method184 activation = trace.use_span(185 span, record_exception=False, set_status_on_exception=False186 )187 activation.__enter__()188 active_spans[worker_ctx] = (activation, span, token, adapter)189 adapter.start_span(span, worker_ctx)190 wrapped(*args, **kwargs)191def worker_result(tracer, config, wrapped, instance, args, kwargs):192 """ Wrap nameko.containers.ServiceContainer._worker_result.193 Finds the existing span for this worker and closes it. Additional194 attributes and status are set by the configured entrypoint adapter.195 """196 (worker_ctx, result, exc_info) = args197 activated = active_spans.pop(worker_ctx, None)198 if not activated:199 # something went wrong when starting the span; nothing more to do200 warnings.warn("worker result when no active span")201 return202 activation, span, token, adapter = activated203 adapter.end_span(span, worker_ctx, result, exc_info)204 if exc_info is None:205 activation.__exit__(None, None, None)206 else:207 activation.__exit__(*exc_info)208 span.end(_time_ns())209 context.detach(token)210 wrapped(*args, **kwargs)211def instrument(tracer, config):212 # set up entrypoint adapters213 adapter_config = DEFAULT_ADAPTERS.copy()214 adapter_config.update(config.get("entrypoint_adapters", {}))215 for entrypoint_path, adapter_path in adapter_config.items():216 entrypoint_class = utils.import_by_path(entrypoint_path)217 adapter_class = utils.import_by_path(adapter_path)218 adapter_types[entrypoint_class] = adapter_class219 # apply patches220 wrap_function_wrapper(221 "nameko.containers",222 "ServiceContainer._worker_setup",223 partial(worker_setup, tracer, config),224 )225 wrap_function_wrapper(226 "nameko.containers",227 "ServiceContainer._worker_result",228 partial(worker_result, tracer, config),229 )230def uninstrument():231 unwrap(nameko.containers.ServiceContainer, "_worker_setup")...

Full Screen

Full Screen

distributed.py

Source:distributed.py Github

copy

Full Screen

...140 def parse_args(self):141 if self._parsed_args is None:142 self._parsed_args = self._parser.parse_args()143 return self._parsed_args144 def _worker_setup(self):145 torch.distributed.init_process_group(self.backend)146 def entry(self):147 """148 Actual process code should be placed after entry()149 """150 self._is_entry_called = True151 args = self.parse_args()152 # DistributedManager master proc will set an env variable for worker proc153 # if this variable doesn't exist, that means it's the master proc154 # otherwise it's worker process, entry() will be no-op155 TORCHX_FLAG = '_TORCHX_DISTRIBUTED_WORKER_'156 if TORCHX_FLAG in os.environ:157 # entering worker proc158 self._worker_setup()159 return160 # entering master proc161 # master proc will launch worker procs using `subprocess`162 # world size in terms of number of processes163 dist_world_size = args.num_procs * args.num_nodes164 # set PyTorch distributed related environmental variables165 current_env = os.environ.copy()166 current_env["MASTER_ADDR"] = args.master_addr167 current_env["MASTER_PORT"] = str(args.master_port)168 current_env["WORLD_SIZE"] = str(dist_world_size)169 current_env[TORCHX_FLAG] = '0' # dummy value170 procs = []171 for local_rank in range(0, args.num_procs):172 # each process's rank...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Molotov automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful