Best Python code snippet using autotest_python
module.py
Source:module.py  
1from mgr_module import MgrModule, CommandResult2import threading3import random4import json5import errno6class Module(MgrModule):7    """8    This module is for testing the ceph-mgr python interface from within9    a running ceph-mgr daemon.10    It implements a sychronous self-test command for calling the functions11    in the MgrModule interface one by one, and a background "workload"12    command for causing the module to perform some thrashing-type13    activities in its serve() thread.14    """15    # These workloads are things that can be requested to run inside the16    # serve() function17    WORKLOAD_COMMAND_SPAM = "command_spam"18    WORKLOAD_THROW_EXCEPTION = "throw_exception"19    SHUTDOWN = "shutdown"20    WORKLOADS = (WORKLOAD_COMMAND_SPAM, WORKLOAD_THROW_EXCEPTION)21    # The test code in qa/ relies on these options existing -- they22    # are of course not really used for anything in the module23    OPTIONS = [24            {'name': 'testkey'},25            {'name': 'testlkey'},26            {'name': 'testnewline'}27    ]28    COMMANDS = [29            {30                "cmd": "mgr self-test run",31                "desc": "Run mgr python interface tests",32                "perm": "rw"33            },34            {35                "cmd": "mgr self-test background start name=workload,type=CephString",36                "desc": "Activate a background workload (one of {0})".format(37                    ", ".join(WORKLOADS)),38                "perm": "rw"39            },40            {41                "cmd": "mgr self-test background stop",42                "desc": "Stop background workload if any is running",43                "perm": "rw"44            },45            {46                "cmd": "mgr self-test config get name=key,type=CephString",47                "desc": "Peek at a configuration value",48                "perm": "rw"49            },50            {51                "cmd": "mgr self-test config get_localized name=key,type=CephString",52                "desc": "Peek at a configuration value (localized variant)",53                "perm": "rw"54            },55            {56                "cmd": "mgr self-test remote",57                "desc": "Test inter-module calls",58                "perm": "rw"59            },60            {61                "cmd": "mgr self-test module name=module,type=CephString",62                "desc": "Run another module's self_test() method",63                "perm": "rw"64            },65            ]66    def __init__(self, *args, **kwargs):67        super(Module, self).__init__(*args, **kwargs)68        self._event = threading.Event()69        self._workload = None70    def handle_command(self, inbuf, command):71        if command['prefix'] == 'mgr self-test run':72            self._self_test()73            return 0, '', 'Self-test succeeded'74        elif command['prefix'] == 'mgr self-test background start':75            if command['workload'] not in self.WORKLOADS:76                return (-errno.EINVAL, '',77                        "Workload not found '{0}'".format(command['workload']))78            self._workload = command['workload']79            self._event.set()80            return 0, '', 'Running `{0}` in background'.format(self._workload)81        elif command['prefix'] == 'mgr self-test background stop':82            if self._workload:83                was_running = self._workload84                self._workload = None85                self._event.set()86                return 0, '', 'Stopping background workload `{0}`'.format(87                        was_running)88            else:89                return 0, '', 'No background workload was running'90        elif command['prefix'] == 'mgr self-test config get':91            return 0, str(self.get_config(command['key'])), ''92        elif command['prefix'] == 'mgr self-test config get_localized':93            return 0, str(self.get_localized_config(command['key'])), ''94        elif command['prefix'] == 'mgr self-test remote':95            self._test_remote_calls()96            return 0, '', 'Successfully called'97        elif command['prefix'] == 'mgr self-test module':98            try:99                r = self.remote(command['module'], "self_test")100            except RuntimeError as e:101                return -1, '', "Test failed: {0}".format(e.message)102            else:103                return 0, str(r), "Self-test OK"104        else:105            return (-errno.EINVAL, '',106                    "Command not found '{0}'".format(command['prefix']))107    def _self_test(self):108        self.log.info("Running self-test procedure...")109        self._self_test_osdmap()110        self._self_test_getters()111        self._self_test_config()112        self._self_test_store()113        self._self_test_misc()114        self._self_test_perf_counters()115    def _self_test_getters(self):116        self.version117        self.get_context()118        self.get_mgr_id()119        # In this function, we will assume that the system is in a steady120        # state, i.e. if a server/service appears in one call, it will121        # not have gone by the time we call another function referring to it122        objects = [123                "fs_map",124                "osdmap_crush_map_text",125                "osd_map",126                "config",127                "mon_map",128                "service_map",129                "osd_metadata",130                "pg_summary",131                "pg_status",132                "pg_dump",133                "df",134                "osd_stats",135                "health",136                "mon_status",137                "mgr_map"138                ]139        for obj in objects:140            assert self.get(obj) is not None141        assert self.get("__OBJ_DNE__") is None142        servers = self.list_servers()143        for server in servers:144            self.get_server(server['hostname'])145        osdmap = self.get('osd_map')146        for o in osdmap['osds']:147            osd_id = o['osd']148            self.get_metadata("osd", str(osd_id))149        self.get_daemon_status("osd", "0")150        #send_command151    def _self_test_config(self):152        # This is not a strong test (can't tell if values really153        # persisted), it's just for the python interface bit.154        self.set_config("testkey", "testvalue")155        assert self.get_config("testkey") == "testvalue"156        self.set_localized_config("testkey", "testvalue")157        assert self.get_localized_config("testkey") == "testvalue"158    def _self_test_store(self):159        existing_keys = set(self.get_store_prefix("test").keys())160        self.set_store("testkey", "testvalue")161        assert self.get_store("testkey") == "testvalue"162        assert sorted(self.get_store_prefix("test").keys()) == sorted(163                list({"testkey"} | existing_keys))164    def _self_test_perf_counters(self):165        self.get_perf_schema("osd", "0")166        self.get_counter("osd", "0", "osd.op")167        #get_counter168        #get_all_perf_coutners169    def _self_test_misc(self):170        self.set_uri("http://this.is.a.test.com")171        self.set_health_checks({})172    def _self_test_osdmap(self):173        osdmap = self.get_osdmap()174        osdmap.get_epoch()175        osdmap.get_crush_version()176        osdmap.dump()177        inc = osdmap.new_incremental()178        osdmap.apply_incremental(inc)179        inc.get_epoch()180        inc.dump()181        crush = osdmap.get_crush()182        crush.dump()183        crush.get_item_name(-1)184        crush.get_item_weight(-1)185        crush.find_takes()186        crush.get_take_weight_osd_map(-1)187        #osdmap.get_pools_by_take()188        #osdmap.calc_pg_upmaps()189        #osdmap.map_pools_pgs_up()190        #inc.set_osd_reweights191        #inc.set_crush_compat_weight_set_weights192        self.log.info("Finished self-test procedure.")193    def _test_remote_calls(self):194        # Test making valid call195        self.remote("influx", "handle_command", "", {"prefix": "influx self-test"})196        # Test calling module that exists but isn't enabled197        mgr_map = self.get("mgr_map")198        all_modules = [m['name'] for m in mgr_map['available_modules']]199        disabled_modules = set(all_modules) - set(mgr_map['modules'])200        disabled_module = list(disabled_modules)[0]201        try:202            self.remote(disabled_module, "handle_command", {"prefix": "influx self-test"})203        except ImportError:204            pass205        else:206            raise RuntimeError("ImportError not raised for disabled module")207        # Test calling module that doesn't exist208        try:209            self.remote("idontexist", "handle_command", {"prefix": "influx self-test"})210        except ImportError:211            pass212        else:213            raise RuntimeError("ImportError not raised for nonexistent module")214        # Test calling method that doesn't exist215        try:216            self.remote("influx", "idontexist", {"prefix": "influx self-test"})217        except NameError:218            pass219        else:220            raise RuntimeError("KeyError not raised")221    def shutdown(self):222        self._workload = self.SHUTDOWN223        self._event.set()224    def _command_spam(self):225        self.log.info("Starting command_spam workload...")226        while not self._event.is_set():227            osdmap = self.get_osdmap()228            dump = osdmap.dump()229            count = len(dump['osds'])230            i = int(random.random() * count)231            w = random.random()232            result = CommandResult('')233            self.send_command(result, 'mon', '', json.dumps({234                'prefix': 'osd reweight',235                'id': i,236                'weight': w237                }), '')238            crush = osdmap.get_crush().dump()239            r, outb, outs = result.wait()240        self._event.clear()241        self.log.info("Ended command_spam workload...")242    def serve(self):243        while True:244            if self._workload == self.WORKLOAD_COMMAND_SPAM:245                self._command_spam()246            elif self._workload == self.SHUTDOWN:247                self.log.info("Shutting down...")248                break249            elif self._workload == self.WORKLOAD_THROW_EXCEPTION:250                raise RuntimeError("Synthetic exception in serve")251            else:252                self.log.info("Waiting for workload request...")253                self._event.wait()...workload_generator.py
Source:workload_generator.py  
...50        self._display_frequency = args.display_frequency51        self.loop = asyncio.get_event_loop()52        self.thread_pool = ThreadPoolExecutor(10)53        asyncio.ensure_future(self._simulator_loop(), loop=self.loop)54    def set_workload(self, workload):55        self._workload = workload56        self._discover_validators()57        self._workload.on_will_start()58    def run(self):59        if self._workload is None:60            raise WorkloadConfigurationError()61        self._time_since_last_check = time.time()62        self.loop.run_forever()63    @asyncio.coroutine64    def _simulator_loop(self):65        while True:66            yield from asyncio.sleep(self._rate)67            with self._lock:68                now = time.time()...depthwise_conv2d.py
Source:depthwise_conv2d.py  
...50    _Schedule(2, 2, 8, 4, False),51    _Schedule(1, 2, 8, 4, True),52    _Schedule(1, 1, 4, 8, True),53]54def _get_workload(data, kernel, stride, padding, out_dtype):55    _, C, IH, IW = [x.value for x in data.shape]56    _, MT, KH, KW = [x.value for x in kernel.shape]57    HPAD, WPAD, _, _ = get_pad_tuple(padding, kernel)58    if isinstance(stride, (tuple, list)):59        HSTR, WSTR = stride60    else:61        HSTR, WSTR = stride, stride62    return _Workload(data.dtype, out_dtype, IH, IW, C, MT, KH, KW, HPAD, WPAD, HSTR, WSTR)63def _schedule(s, data, data_pad, kernel, output, last):64    padding = infer_pad(data, data_pad)65    if data_pad is None:66        stride = infer_stride(data, kernel, output)67    else:68        stride = infer_stride(data_pad, kernel, output)69    wkl = _get_workload(data, kernel, stride, padding, output.dtype)70    if wkl not in _WORKLOADS:71        return s72    # use specified schedule73    sch = _SCHEDULES[_WORKLOADS.index(wkl)]74    H, W = wkl.height, wkl.width75    CN = wkl.channel76    MT = wkl.multiplier77    HK, WK = wkl.hkernel, wkl.wkernel78    HPAD, WPAD = wkl.hpad, wkl.wpad79    HSTR, WSTR = wkl.hstride, wkl.wstride80    VH, VW = sch.vh, sch.vw81    BC = sch.bc82    VC = sch.vc83    TH = H + 2*HPAD...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
