How to use load_checks method in refurb

Best Python code snippet using refurb_python

plugin.py

Source:plugin.py Github

copy

Full Screen

1import os2import re3import sys4import json5import inspect6import asyncio7import pkgutil8from collections import defaultdict9from importlib import import_module10from typing import Dict, Any, List, Callable11from fastapi import FastAPI, APIRouter12from opa.utils import unique, filter_dict_to_function13from opa import config, log14class BasePlugin:15 pass16class Driver(BasePlugin):17 name: str18 instance: Any = None19 opts: Dict[str, Any]20 pm: 'PluginManager'21 def __init__(self, opts=None, load='auto'):22 self.opts = opts or {}23 self.load = load24 def _pre_connection_check(self, connectionstatus, load):25 if connectionstatus == False: # if no host found26 if self.load == 'yes':27 raise Exception(28 f'Connect pre-check failed for {self.name}, as if the host is not there? Options {self.opts}'29 )30 else:31 return False32 if hasattr(self, 'validate'):33 return True34 def initialize(self):35 connectionstatus = self.connect()36 if self._pre_connection_check(connectionstatus, self.load):37 self.validate()38 async def initialize_async(self):39 connectionstatus = await self.connect()40 if self._pre_connection_check(connectionstatus, self.load):41 await self.validate()42 def get_instance(self):43 return self.instance44class HookDefinition(BasePlugin):45 required: bool = False46 is_async: bool = False47 _used: bool = False48class Hook(BasePlugin):49 name: str50 order: int = 051class Setup(BasePlugin):52 ...53def get_defined_plugins(mod, plugin_types=None):54 plugin_types = plugin_types or ['hook-definitions', 'hooks', 'drivers', 'setup']55 returndata = defaultdict(list)56 for name, obj in inspect.getmembers(mod, inspect.isclass):57 if mod.__name__ != obj.__module__:58 # We don't want to load plugins/modules if they are imported, ie 'from ... import AnotherPlugin'59 continue60 if obj is Hook or obj is Driver or obj is Setup:61 continue62 if issubclass(obj, Hook) and 'hooks' in plugin_types:63 returndata['hooks'].append(obj)64 elif issubclass(obj, HookDefinition) and 'hook-definitions' in plugin_types:65 returndata['hook-definitions'].append(obj)66 elif issubclass(obj, Driver) and 'drivers' in plugin_types:67 returndata['drivers'].append(obj)68 elif issubclass(obj, Setup) and 'setup' in plugin_types:69 returndata['setup'].append(obj)70 return returndata71class PluginManager:72 status: Dict[str, Dict]73 drivers: Dict[str, Driver]74 optional_components: Dict[str, Driver]75 hooks: Dict[str, List[Hook]]76 hook_definitions: Dict[str, Hook]77 store: Dict[str, Any]78 def __init__(self):79 self.status = defaultdict(dict)80 self.drivers = {}81 self.optional_components = {}82 self.hooks = defaultdict(list)83 self.hook_definitions = {}84 self.store = {'task_candidates': []}85 def post_hook(self):86 final_hooks: Dict[str, Hook] = {}87 for hook_name, hooks in self.hooks.items():88 definition = self.hook_definitions.get(hook_name)89 try:90 hook_to_use = sorted(hooks, key=lambda x: x.order)[-1]91 except IndexError:92 continue93 if definition:94 definition._used = True95 final_hooks[hook_name] = hook_to_use96 should_be_used = [97 name98 for name, definition in self.hook_definitions.items()99 if all([definition.required, not definition._used])100 ]101 if should_be_used:102 raise Exception(f'Hooks that should be registered: {should_be_used}')103 self.hooks = final_hooks104 def register_driver(self, driver: Driver):105 name = driver.name.lower()106 if name in self.drivers:107 if self.drivers[name] is driver:108 # This might happen example if we import a driver-class inside a plugin-file.109 # It should be valid, example if you need it for typing.110 return None111 raise Exception(112 f'Driver with this name ({name}) already exists. CaSe of driver is ignored.'113 )114 log.debug(f'Registered driver {name}')115 self.drivers[name] = driver116 def _preload_drivers(self):117 for name, values in config.OPTIONAL_COMPONENTS.items():118 name = name.lower() | log119 {'test': 123} | log120 load = values.get('LOAD', 'auto')121 if load == 'no':122 continue123 drivername = values.get('DRIVER')124 try:125 driver = self.drivers[drivername]126 except KeyError:127 raise Exception(128 f'Invalid driver specified ({drivername}), no way to handle it'129 )130 driverinstance = driver(opts=values.get('OPTS', {}), load=load)131 driverinstance.pm = self132 self.optional_components[name] = driverinstance133 log.info(134 f'Connecting to {name} with driver {drivername}, using {driverinstance.opts}'135 )136 yield driverinstance137 async def load_components(self):138 for driverinstance in self._preload_drivers():139 if asyncio.iscoroutinefunction(driverinstance.connect):140 await driverinstance.initialize_async()141 else:142 driverinstance.initialize()143 def load_sync_components_global(self):144 for driverinstance in self._preload_drivers():145 if asyncio.iscoroutinefunction(driverinstance.connect):146 log.debug(f'Driver {driverinstance.name} is async, wont load')147 else:148 driverinstance.initialize()149 def register_hook_definition(self, obj):150 try:151 name = obj.name152 except AttributeError:153 name = obj.__name__154 if name in self.hook_definitions:155 raise Exception(156 f'There can only be 1 hook-definition per hook-name. "{name}" is already registered'157 )158 self.hook_definitions[name] = obj159 def register_hook(self, obj):160 try:161 name = obj.name162 except AttributeError:163 name = obj.__name__164 try:165 hook_definition = self.hook_definitions[name]166 except KeyError:167 raise Exception(168 f'There are no hook-definition for hook "{name}", are you using the correct name?'169 )170 if hook_definition.is_async:171 if not asyncio.iscoroutinefunction(obj.run):172 raise Exception(173 f'Hook-definition is marked as async but the function is not ({obj.run}).'174 )175 else:176 if asyncio.iscoroutinefunction(obj.run):177 raise Exception(178 f'Hook-definition is not marked as async, but is.. Mark it as async or make it sync: {obj.run}'179 )180 self.hooks[name].append(obj())181 def run_setup(self, obj, params):182 params = filter_dict_to_function(params, obj.__init__)183 name = f'{obj.__module__}.{obj.__name__}'184 self.status[name]['init'] = obj(**params)185 def call(self, name, *args, **kwargs):186 func = self.hooks[name].run187 if asyncio.iscoroutinefunction(func):188 raise Exception(189 f'The hook function ({func}) is async and should not be called using non-async calls. Call it using "await call_async()"'190 )191 return func(*args, **kwargs)192 async def call_async(self, name, *args, **kwargs):193 func = self.hooks[name].run194 if not asyncio.iscoroutinefunction(func):195 raise Exception(196 f'The hook function ({func}) is not async call it using "call(..)"'197 )198 return await self.hooks[name].run(*args, **kwargs)199def _get_plugindata():200 """201 Plugins are imported from multiple paths with these rules:202 * First with a unique name wins203 * There are multiple matchers, that ALL must return true. They return true if they are NOT set, or if they match "$plugin_path / $plugin_name"204 * PLUGIN_WHITELIST_RE (regex)205 * PLUGIN_WHITELIST_LIST206 * PLUGIN_WHITELIST_TAGS207 * not in PLUGIN_BLACKLIST_LIST208 * not in PLUGIN_BLACKLIST_RE209 * not in PLUGIN_BLACKLIST_TAGS210 """211 PLUGIN_WHITELIST_RE = re.compile(config.PLUGIN_WHITELIST_RE)212 PLUGIN_BLACKLIST_RE = re.compile(config.PLUGIN_BLACKLIST_RE)213 PLUGIN_WHITELIST_TAGS = set(config.PLUGIN_WHITELIST_TAGS)214 PLUGIN_BLACKLIST_TAGS = set(config.PLUGIN_BLACKLIST_TAGS)215 PLUGIN_PATHS = unique(216 list([config.PLUGIN_PATHS])217 if isinstance(config.PLUGIN_PATHS, str)218 else config.PLUGIN_PATHS219 ) + ['/data/opa/plugins']220 log.info(221 'Plugin loading settings:'222 f' plugin-paths: {PLUGIN_PATHS}\n'223 f' whitelist-regex: {PLUGIN_WHITELIST_RE}\n'224 f' whitelist-list: {config.PLUGIN_WHITELIST_LIST}\n'225 f' whitelist-tags: {config.PLUGIN_WHITELIST_TAGS}\n'226 f' blacklist-list: {config.PLUGIN_BLACKLIST_LIST}\n'227 f' blacklist-regex: {PLUGIN_BLACKLIST_RE}\n'228 f' blacklist-tags: {PLUGIN_BLACKLIST_TAGS}\n'229 )230 sys_paths = sys.path + PLUGIN_PATHS231 sys.path = unique(sys_paths)232 plugins_to_load = defaultdict(list)233 task_candidates = []234 routers = []235 for plugin in pkgutil.iter_modules(PLUGIN_PATHS):236 allow_match = os.path.join(plugin.module_finder.path, plugin.name)237 tasks_candidate = False238 if plugin.ispkg:239 metafile = os.path.join(allow_match, 'meta.json')240 if os.path.exists(os.path.join(allow_match, 'tasks.py')):241 tasks_candidate = True242 else:243 metafile = f'{allow_match}-meta.json'244 log.debug('')245 log.debug(f'Checking if we should load "{allow_match}"')246 if os.path.exists(metafile):247 log.debug(f'Found metafile @ {metafile}')248 metadata = json.load(open(metafile, 'r'))249 else:250 log.debug(f'Metafile @ {metafile} does not exist, using empty metadata')251 metadata = {}252 log.debug(f'Metadata: {metadata}')253 load_checks = {}254 if config.PLUGIN_WHITELIST_LIST:255 load_checks['PLUGIN_WHITELIST_LIST'] = (256 allow_match in config.PLUGIN_WHITELIST_LIST257 )258 if PLUGIN_WHITELIST_RE.pattern:259 load_checks['PLUGIN_WHITELIST_RE'] = bool(260 PLUGIN_WHITELIST_RE.match(allow_match)261 )262 if PLUGIN_WHITELIST_TAGS:263 load_checks['PLUGIN_WHITELIST_TAGS'] = bool(264 PLUGIN_WHITELIST_TAGS & set(metadata.get('tags', []))265 )266 if config.PLUGIN_BLACKLIST_LIST:267 load_checks['PLUGIN_BLACKLIST_LIST'] = (268 allow_match not in config.PLUGIN_BLACKLIST_LIST269 )270 if PLUGIN_BLACKLIST_RE.pattern:271 load_checks['PLUGIN_BLACKLIST_RE'] = not bool(272 PLUGIN_BLACKLIST_RE.match(allow_match)273 )274 if PLUGIN_BLACKLIST_TAGS:275 load_checks['PLUGIN_BLACKLIST_TAGS'] = not bool(276 PLUGIN_BLACKLIST_TAGS & set(metadata.get('tags', []))277 )278 load = all(load_checks.values())279 log.debug(f'Load-checks: {load_checks}, overall({load})')280 if not load:281 continue282 log.info(f'Loading plugin: {plugin.name}')283 """284 We should consider using lazy-loading instead of import_module.285 This line imports all .py files, which is problematic for example for tasks.286 If you import tasks inside an __init__, you will still need a get_component at287 top of tasks (to mark function as celery tasks). The get_component('celery') won't288 return anything, since everything is not ready yet.289 Refactoring this code will probably be a no-go since there are many dependencies290 in eg the hooking system. Modules needs to be loaded291 """292 mod = import_module(plugin.name)293 if tasks_candidate:294 task_candidates.append(plugin.name)295 defined_plugins = get_defined_plugins(mod)296 for pt in ['hook-definitions', 'hooks', 'drivers', 'setup']:297 plugins_to_load[pt] += defined_plugins[pt]298 if hasattr(mod, 'router'):299 routers.append(mod.router)300 return {301 'plugins_to_load': plugins_to_load,302 'task_candidates': task_candidates,303 'routers': routers,304 }305plugin_manager: PluginManager306async def startup(app):307 global plugin_manager308 plugin_manager = PluginManager()309 plugin_manager.store.update(**_get_plugindata())310 for hook_definition in plugin_manager.store['plugins_to_load']['hook-definitions']:311 plugin_manager.register_hook_definition(hook_definition)312 for hook in plugin_manager.store['plugins_to_load']['hooks']:313 plugin_manager.register_hook(hook)314 plugin_manager.post_hook()315 for driver in plugin_manager.store['plugins_to_load']['drivers']:316 plugin_manager.register_driver(driver)317 await plugin_manager.load_components()318 for router in plugin_manager.store['routers']:319 app.include_router(router)320 for setup in plugin_manager.store['plugins_to_load']['setup']:321 plugin_manager.run_setup(setup, {'app': app})322def startup_simple():323 """324 Startup for simple sync apps that want some of the core functionality of opa-stack,325 but not the fastapi app itself. Usefull for things like celery workers326 """327 global plugin_manager328 plugin_manager = PluginManager()329 plugin_manager.store.update(**_get_plugindata())330 for hook_definition in plugin_manager.store['plugins_to_load']['hook-definitions']:331 plugin_manager.register_hook_definition(hook_definition)332 for hook in plugin_manager.store['plugins_to_load']['hooks']:333 plugin_manager.register_hook(hook)334 plugin_manager.post_hook()335 for driver in plugin_manager.store['plugins_to_load']['drivers']:336 plugin_manager.register_driver(driver)337 plugin_manager.load_sync_components_global()338async def shutdown():339 pass340def get_plugin_manager() -> PluginManager:341 return plugin_manager342def get_component(name: str):343 try:344 return plugin_manager.optional_components[name]345 except KeyError:346 raise Exception(347 f'Component is not defined (yet?) or you are using get_instance() outside of a function... Defined components are: {list(plugin_manager.optional_components.keys())}'348 )349def get_instance(name: str):350 return get_component(name).instance351def get_util(name: str):352 return get_component(name).util353def call_hook(name, *args, **kwargs):354 return plugin_manager.call(name, *args, **kwargs)355async def call_hook_async(name, *args, **kwargs):356 return await plugin_manager.call_async(name, *args, **kwargs)357def get_router(*args, **kwargs) -> APIRouter:...

Full Screen

Full Screen

main.py

Source:main.py Github

copy

Full Screen

...22 allow_headers=["*"],23)24checkfile_path = 'data/checked.json'25checks = {}26def load_checks():27 global checks28 if os.path.exists(checkfile_path):29 with open(checkfile_path) as f:30 checks = json.load(f)31 else:32 default = {"harm": False,"jirsi": False, "judith": False}33 for a in load_basenames():34 checks[a] = default35def save_checks():36 with open(checkfile_path, 'w') as f:37 return json.dump(checks, f)38@app.get("/")39async def root():40 return {"message": "This is the backend for the analiticcl-evaluation-tool"}41@app.get("/versions")42async def get_versions():43 versions = set()44 for annotation_file in os.listdir('data'):45 if annotation_file.endswith('.json'):46 parts = annotation_file.split('.')47 if len(parts) == 3:48 versions.add(parts[1])49 return sorted(list(versions))50@app.get("/basenames")51async def get_basenames():52 return load_basenames()53@app.get("/checks")54async def get_checks():55 load_checks()56 return checks57def load_basenames():58 with open('data/ga-selection-basenames.json') as f:59 return json.load(f)60def load_url_dict():61 with open('data/page-selection.json') as f:62 return json.load(f)63@app.get("/pagedata/{basename}/{version}")64async def get_page_data(basename: str, version: str):65 transkribus_url = load_url_dict()66 load_checks()67 text_file = f'data/{basename}.txt'68 if not os.path.exists(text_file):69 _file_not_found(text_file)70 with open(text_file, encoding='utf8') as f:71 text = f.read()72 annotations_file = f'data/{basename}.{version}.json'73 if not os.path.exists(annotations_file):74 _file_not_found(annotations_file)75 with open(annotations_file, encoding='utf8') as f:76 annotations = json.load(f)77 return {78 "text": text,79 "annotations": annotations,80 "transkribus_url": transkribus_url[basename],81 "checked": {82 "harm": checks[basename]['harm'],83 "jirsi": checks[basename]['jirsi'],84 "judith": checks[basename]['judith']85 }86 }87class AnnotationUpdateBody(BaseModel):88 annotations: List[Dict]89 checked: Dict[str, Any]90@app.put("/annotations/{basename}/{version}")91async def put_annotations(basename: str, version: str, aub: AnnotationUpdateBody):92 annotations = aub.annotations93 with open(f'data/{basename}.{version}.json', 'w', encoding='utf8') as f:94 json.dump(annotations, f, indent=4)95 load_checks()96 checks[basename] = aub.checked97 save_checks()98 return aub99@app.get("/html/{filename}.html", response_class=HTMLResponse)100async def get_html(filename: str):101 filepath = f'doc/{filename}.md'102 if not os.path.exists(filepath):103 _file_not_found(filepath)104 with open(filepath, "r", encoding="utf-8") as input_file:105 text = input_file.read()106 return markdown.markdown(text)107def _file_not_found(filepath: str):108 raise HTTPException(status_code=404, detail=f"File not found: {filepath}")109if __name__ == '__main__':...

Full Screen

Full Screen

scheduler.py

Source:scheduler.py Github

copy

Full Screen

...25 self.misfire_grace_time = misfire_grace_time26 self.max_instances = max_instances27 self.coalesce = coalesce28 self.message_queue.connect()29 def load_checks(self):30 return load_data_to_memory(3000)31 def _verbose(self, message):32 if self.verbose:33 logger.debug(message)34 def _schedule_check(self, check):35 """Schedule check."""36 self.message_queue.send(check)37 def _update_is_required(self):38 """Check if update is required."""39 conn = StrictRedis(host=self.redis_host, port=self.redis_port,40 db=self.redis_db)41 return conn.get(self.cache_update_key)42 def _update_jobs(self):43 """Update jobs.44 Check in cache flag 'update_required' if it's set update checks.45 """46 if not self._update_is_required():47 return48 check_id = 90049 check = generate_check_def(check_id, 1)50 self.scheduler.remove_job(str(check_id))51 self.scheduler.add_job(52 self._schedule_check, 'interval', seconds=check['interval'],53 misfire_grace_time=self.misfire_grace_time,54 max_instances=self.max_instances,55 coalesce=self.coalesce,56 id=str(check_id),57 args=(check, ))58 self._verbose("Updating")59 def run(self):60 """Run scheduler."""61 checks = self.load_checks()62 self.scheduler.add_job(self._update_jobs,63 'interval', seconds=10)64 for k, v in checks.items():65 self.scheduler.add_job(self._schedule_check, 'interval',66 seconds=v['interval'],67 misfire_grace_time=self.misfire_grace_time,68 max_instances=self.max_instances,69 coalesce=self.coalesce,70 id=str(k),71 args=(v,))72 self.scheduler.start()73 self._verbose('Press Ctrl+{0} to exit.'.format(74 'Break' if os.name == 'nt' else 'C'))75 try:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run refurb automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful