Best Python code snippet using molotov_python
test_fmwk.py
Source:test_fmwk.py  
...272        self.assertEqual(results["FAILED"], 0)273    @dedicatedloop274    def test_setup_exception(self):275        @setup()276        async def _worker_setup(num, args):277            raise Exception("bleh")278        @scenario(weight=100)279        async def test_two(session):280            os.kill(os.getpid(), signal.SIGTERM)281        args = self.get_args()282        results = Runner(args)()283        self.assertEqual(results["OK"], 0)284        self.assertEqual(results["SETUP_FAILED"], 1)285    @dedicatedloop286    def test_global_setup_exception(self):287        @global_setup()288        def _setup(args):289            raise Exception("bleh")290        @scenario(weight=100)291        async def test_two(session):292            os.kill(os.getpid(), signal.SIGTERM)293        args = self.get_args()294        runner = Runner(args)295        self.assertRaises(Exception, runner)296    @dedicatedloop297    def test_teardown_exception(self):298        @teardown()299        def _teardown(args):300            raise Exception("bleh")301        @scenario(weight=100)302        async def test_two(session):303            os.kill(os.getpid(), signal.SIGTERM)304        args = self.get_args()305        results = Runner(args)()306        self.assertEqual(results["FAILED"], 0)307    @dedicatedloop308    def test_setup_not_dict(self):309        @setup()310        async def _worker_setup(num, args):311            return 1312        @scenario(weight=100)313        async def test_two(session):314            os.kill(os.getpid(), signal.SIGTERM)315        args = self.get_args()316        results = Runner(args)()...entrypoints.py
Source:entrypoints.py  
1# -*- coding: utf-8 -*-2"""3This module patches the Nameko ServiceContainer so that every entrypoint that fires4generates a span with helpful defaults.5The kind, name, attributes and status of the span are determined by the6EntrypointAdapter class. More specialised versions can be provided by passing an7appropriate dictionary as `entrypoint_adapters` when invoking8`NamekoInstrumentor.instrument()`.9For example:10    entrypoint_to_adapter_map = {11        "my.custom.EntrypointType": "my.custom.EntrypointAdapter"12    }13    instrumentor = NamekoInstrumentor()14    instrumentor.instrument(entrypoint_adapters=entrypoint_to_adapter_map)15"""16import inspect17import socket18import warnings19from collections import defaultdict20from functools import partial21from traceback import format_exception22from weakref import WeakKeyDictionary23import nameko.containers24from nameko.utils import get_redacted_args25from opentelemetry import context, trace26from opentelemetry.instrumentation.utils import unwrap27from opentelemetry.propagate import extract28from opentelemetry.trace.status import Status, StatusCode29from opentelemetry.util._time import _time_ns30from wrapt import wrap_function_wrapper31from nameko_opentelemetry import utils32from nameko_opentelemetry.scrubbers import scrub33DEFAULT_ADAPTERS = {34    "nameko.rpc.Rpc": ("nameko_opentelemetry.rpc.RpcEntrypointAdapter"),35    "nameko.web.handlers.HttpRequestHandler": (36        "nameko_opentelemetry.http.HttpEntrypointAdapter"37    ),38    "nameko.events.EventHandler": (39        "nameko_opentelemetry.events.EventHandlerEntrypointAdapter"40    ),41    "nameko.messaging.Consumer": (42        "nameko_opentelemetry.messaging.ConsumerEntrypointAdapter"43    ),44    "nameko.timer.Timer": ("nameko_opentelemetry.timer.TimerEntrypointAdapter"),45}46active_spans = WeakKeyDictionary()47adapter_types = defaultdict(lambda: EntrypointAdapter)48class EntrypointAdapter:49    """ Default entrypoint adapter. This implementation is used unless there's50    a more specific adapter set for the firing entrypoint's type.51    """52    span_kind = trace.SpanKind.SERVER53    def __init__(self, config):54        self.config = config55    def get_span_name(self, worker_ctx):56        return f"{worker_ctx.service_name}.{worker_ctx.entrypoint.method_name}"57    def get_metadata(self, worker_ctx):58        return worker_ctx.context_data59    def exception_was_expected(self, worker_ctx, exc):60        expected_exceptions = getattr(61            worker_ctx.entrypoint, "expected_exceptions", None62        )63        expected_exceptions = expected_exceptions or tuple()64        return isinstance(exc, expected_exceptions)65    def start_span(self, span, worker_ctx):66        if span.is_recording():67            span.set_attributes(self.get_attributes(worker_ctx))68    def end_span(self, span, worker_ctx, result, exc_info):69        if span.is_recording():70            if exc_info:71                span.record_exception(72                    exc_info[1],73                    escaped=True,74                    attributes=self.get_exception_attributes(worker_ctx, exc_info),75                )76            else:77                span.set_attributes(78                    self.get_result_attributes(worker_ctx, result) or {}79                )80            status = self.get_status(worker_ctx, result, exc_info)81            span.set_status(status)82    def get_attributes(self, worker_ctx):83        """ Common attributes for most entrypoints, and hooks into subclassable84        implementations to fetch optional attributes.85        """86        entrypoint = worker_ctx.entrypoint87        attributes = {88            "service_name": worker_ctx.service_name,89            "entrypoint_type": type(entrypoint).__name__,90            "method_name": entrypoint.method_name,91            "active_workers": worker_ctx.container._worker_pool.running(),92            "available_workers": worker_ctx.container._worker_pool.free(),93        }94        attributes.update(self.get_header_attributes(worker_ctx) or {})95        if getattr(entrypoint, "sensitive_arguments", None):96            call_args = get_redacted_args(97                entrypoint, *worker_ctx.args, **worker_ctx.kwargs98            )99            redacted = True100        else:101            method = getattr(entrypoint.container.service_cls, entrypoint.method_name)102            call_args = inspect.getcallargs(103                method, None, *worker_ctx.args, **worker_ctx.kwargs104            )105            del call_args["self"]106            redacted = False107        attributes.update(108            self.get_call_args_attributes(worker_ctx, call_args, redacted) or {}109        )110        return attributes111    def get_call_args_attributes(self, worker_ctx, call_args, redacted):112        """ ...113        """114        if self.config.get("send_request_payloads"):115            call_args, truncated = utils.truncate(116                utils.serialise_to_string(scrub(call_args, self.config)),117                max_len=self.config.get("truncate_max_length"),118            )119            return {120                "call_args": call_args,121                "call_args_truncated": str(truncated),122                "call_args_redacted": str(redacted),123            }124    def get_header_attributes(self, worker_ctx):125        """ ...126        """127        if self.config.get("send_headers"):128            return {129                "context_data": utils.serialise_to_string(130                    scrub(worker_ctx.data, self.config)131                )132            }133    def get_exception_attributes(self, worker_ctx, exc_info):134        """ Additional attributes to save alongside a worker exception.135        """136        exc_type, exc, _ = exc_info137        try:138            stacktrace = "\n".join(format_exception(*exc_info))139        except Exception:140            stacktrace = "Exception occurred on stacktrace formatting"141        return {142            "exception.stacktrace": stacktrace,143            "exception.expected": str(self.exception_was_expected(worker_ctx, exc)),144        }145    def get_result_attributes(self, worker_ctx, result):146        """ Attributes describing the entrypoint method result.147        """148        if self.config.get("send_response_payloads"):149            return {150                "result": utils.serialise_to_string(scrub(result or "", self.config))151            }152    def get_status(self, worker_ctx, result, exc_info):153        """ Span status for this worker.154        """155        if exc_info:156            exc_type, exc, _ = exc_info157            if not self.exception_was_expected(worker_ctx, exc):158                return Status(159                    StatusCode.ERROR,160                    description="{}: {}".format(type(exc).__name__, exc),161                )162        return Status(StatusCode.OK)163def adapter_factory(worker_ctx, config):164    adapter_class = adapter_types[type(worker_ctx.entrypoint)]165    return adapter_class(config)166def worker_setup(tracer, config, wrapped, instance, args, kwargs):167    """ Wrap nameko.containers.ServiceContainer._worker_setup.168    Creates a new span for each entrypoint that fires. The name of the169    span and its attributes are determined by the entrypoint "adapter"170    that is configured for that entrypoint, or the default implementation.171    """172    (worker_ctx,) = args173    adapter = adapter_factory(worker_ctx, config)174    ctx = extract(adapter.get_metadata(worker_ctx))175    token = context.attach(ctx)176    span = tracer.start_span(177        adapter.get_span_name(worker_ctx),178        kind=adapter.span_kind,179        attributes={"hostname": socket.gethostname()},180        start_time=_time_ns(),181    )182    # don't automatically record the exception or set status, because183    # we do that in the entrypoint adapter's `end_span` method184    activation = trace.use_span(185        span, record_exception=False, set_status_on_exception=False186    )187    activation.__enter__()188    active_spans[worker_ctx] = (activation, span, token, adapter)189    adapter.start_span(span, worker_ctx)190    wrapped(*args, **kwargs)191def worker_result(tracer, config, wrapped, instance, args, kwargs):192    """ Wrap nameko.containers.ServiceContainer._worker_result.193    Finds the existing span for this worker and closes it. Additional194    attributes and status are set by the configured entrypoint adapter.195    """196    (worker_ctx, result, exc_info) = args197    activated = active_spans.pop(worker_ctx, None)198    if not activated:199        # something went wrong when starting the span; nothing more to do200        warnings.warn("worker result when no active span")201        return202    activation, span, token, adapter = activated203    adapter.end_span(span, worker_ctx, result, exc_info)204    if exc_info is None:205        activation.__exit__(None, None, None)206    else:207        activation.__exit__(*exc_info)208    span.end(_time_ns())209    context.detach(token)210    wrapped(*args, **kwargs)211def instrument(tracer, config):212    # set up entrypoint adapters213    adapter_config = DEFAULT_ADAPTERS.copy()214    adapter_config.update(config.get("entrypoint_adapters", {}))215    for entrypoint_path, adapter_path in adapter_config.items():216        entrypoint_class = utils.import_by_path(entrypoint_path)217        adapter_class = utils.import_by_path(adapter_path)218        adapter_types[entrypoint_class] = adapter_class219    # apply patches220    wrap_function_wrapper(221        "nameko.containers",222        "ServiceContainer._worker_setup",223        partial(worker_setup, tracer, config),224    )225    wrap_function_wrapper(226        "nameko.containers",227        "ServiceContainer._worker_result",228        partial(worker_result, tracer, config),229    )230def uninstrument():231    unwrap(nameko.containers.ServiceContainer, "_worker_setup")...distributed.py
Source:distributed.py  
...140    def parse_args(self):141        if self._parsed_args is None:142            self._parsed_args = self._parser.parse_args()143        return self._parsed_args144    def _worker_setup(self):145        torch.distributed.init_process_group(self.backend)146    def entry(self):147        """148        Actual process code should be placed after entry()149        """150        self._is_entry_called = True151        args = self.parse_args()152        # DistributedManager master proc will set an env variable for worker proc153        # if this variable doesn't exist, that means it's the master proc154        # otherwise it's worker process, entry() will be no-op155        TORCHX_FLAG = '_TORCHX_DISTRIBUTED_WORKER_'156        if TORCHX_FLAG in os.environ:157            # entering worker proc158            self._worker_setup()159            return160        # entering master proc161        # master proc will launch worker procs using `subprocess`162        # world size in terms of number of processes163        dist_world_size = args.num_procs * args.num_nodes164        # set PyTorch distributed related environmental variables165        current_env = os.environ.copy()166        current_env["MASTER_ADDR"] = args.master_addr167        current_env["MASTER_PORT"] = str(args.master_port)168        current_env["WORLD_SIZE"] = str(dist_world_size)169        current_env[TORCHX_FLAG] = '0'  # dummy value170        procs = []171        for local_rank in range(0, args.num_procs):172            # each process's rank...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
