How to use _run_scripts method in avocado

Best Python code snippet using avocado_python

fal_runner.py

Source:fal_runner.py Github

copy

Full Screen

...41 global_scripts = _get_global_scripts(faldbt, args)42 if args.before:43 _handle_global_scripts(args, global_scripts, faldbt, selector_flags)44 pre_hook_scripts = _get_hooks_for_model(models, faldbt, "pre-hook")45 _run_scripts(args, pre_hook_scripts, faldbt)46 _run_scripts(args, scripts, faldbt)47 else:48 _run_scripts(args, scripts, faldbt)49 post_hook_scripts = _get_hooks_for_model(models, faldbt, "post-hook")50 _run_scripts(args, post_hook_scripts, faldbt)51 _handle_global_scripts(args, global_scripts, faldbt, selector_flags)52def _handle_global_scripts(args: argparse.Namespace,53 global_scripts: List[FalScript],54 faldbt: FalDbt,55 selector_flags: Any) -> None:56 scripts_flag = _scripts_flag(args)57 if not scripts_flag and not selector_flags:58 # run globals when no --script is passed and no selector is passed59 _run_scripts(args, global_scripts, faldbt)60 if (scripts_flag or selector_flags) and args.globals:61 _run_scripts(args, global_scripts, faldbt)62def _run_scripts(args: argparse.Namespace, scripts: List[FalScript], faldbt: FalDbt):63 scheduler = Scheduler(64 [TaskGroup(FalLocalHookTask.from_fal_script(script)) for script in scripts]65 )66 parallel_executor(args, faldbt, scheduler)67 failed_tasks: List[FalLocalHookTask] = [68 group.task for group in scheduler.filter_groups(Status.FAILURE)69 ] # type: ignore70 failed_script_ids = [task.build_fal_script(faldbt).id for task in failed_tasks]71 if failed_script_ids:72 raise RuntimeError(f"Error in scripts {str.join(', ',failed_script_ids)}")73def _scripts_flag(args: argparse.Namespace) -> bool:74 return bool(args.scripts)75def _get_hooks_for_model(76 models: List[DbtModel], faldbt: FalDbt, hook_type: str...

Full Screen

Full Screen

installer.py

Source:installer.py Github

copy

Full Screen

...22 metaconn = self.connection23 self.tracker = Tracker(metaconn)24 if not self.tracker.is_initialized():25 raise PGVIsNotInitialized()26 def _run_scripts(self, package, revision, event, **kwargs):27 kwargs["revision"] = revision28 dirname = os.path.join(package.tmpdir, revision)29 loader = pgv.loader.Loader(dirname)30 for filename in package.scripts(revision, event):31 script = loader.load(filename)32 with self.connection.cursor() as cursor:33 with self.tracker.script(filename):34 logger.info(" run %s", filename)35 cursor.execute(script, kwargs)36 def install(self, package):37 for revision in package.revlist:38 if self.tracker.is_installed(revision):39 logger.debug("revision %s is installed already", revision)40 continue41 logger.info("installing revision %s", revision)42 with self.revision(package, revision):43 directory = os.path.join(package.tmpdir, revision)44 loader = pgv.loader.Loader(directory)45 for schema in package.schemas(revision):46 logger.info(" schema %s", schema)47 with self.schema(package, revision, schema):48 for filename in package.schema_files(revision, schema):49 logger.info(" script %s", filename)50 script = loader.load(filename)51 with self.script(package, revision, schema,52 filename):53 with self.tracker.script(filename):54 with self.connection.cursor() as c:55 c.execute(script)56 if not self.connection.autocommit:57 self.connection.commit()58 def revision(self, package, revision):59 this = self60 schemas = package.schemas(revision)61 files = list(chain(62 *[package.schema_files(revision, x) for x in schemas]))63 scripts = list(chain(64 *[package.scripts(revision, x) for x in Package.events]))65 class RevisionInstaller:66 def __enter__(self):67 this._run_scripts(package, revision, this.events.start)68 def __exit__(self, type, value, tb):69 this._run_scripts(package, revision, this.events.stop)70 if type is None:71 this.tracker.commit(72 revision,73 schemas=schemas,74 files=files,75 scripts=scripts)76 return type is None77 return RevisionInstaller()78 def schema(self, package, revision, schema):79 this = self80 class SchemaInstaller:81 def __enter__(self):82 this._run_scripts(package, revision, this.events.pre,83 schema=schema)84 def __exit__(self, type, value, tb):85 this._run_scripts(package, revision, this.events.post,86 schema=schema)87 return type is None88 return SchemaInstaller()89 def script(self, package, revision, schema, filename):90 this = self91 class ScriptInstaller:92 def __enter__(self):93 pass94 def __exit__(self, type, value, tb):95 if type is None:96 this._run_scripts(package, revision, this.events.success,97 schema=schema, filename=filename)98 else:99 this._run_scripts(package, revision, this.events.error,100 schema=schema, filename=filename)101 return type is None...

Full Screen

Full Screen

jobscripts.py

Source:jobscripts.py Github

copy

Full Screen

...36 default=default)37class JobScripts(JobPre, JobPost):38 name = 'jobscripts'39 description = 'Runs scripts before/after the job is run'40 def _run_scripts(self, kind, scripts_dir, job):41 config = job.config42 if not os.path.isdir(scripts_dir):43 if config.get('plugins.jobscripts.warn_non_existing_dir'):44 LOG_UI.error("Directory configured to hold %s-job scripts "45 "has not been found: %s", kind, scripts_dir)46 return47 dir_list = os.listdir(scripts_dir)48 scripts = [os.path.join(scripts_dir, f) for f in dir_list]49 scripts = [f for f in scripts50 if os.access(f, os.R_OK | os.X_OK)]51 scripts.sort()52 if not scripts:53 return54 env = self._job_to_environment_variables(job)55 non_zero_namespace = 'plugins.jobscripts.warn_non_zero_status'56 warn_non_zero_status = config.get(non_zero_namespace)57 for script in scripts:58 result = process.run(script, ignore_status=True, env=env)59 if (result.exit_status != 0) and warn_non_zero_status:60 LOG_UI.error('%s job script "%s" exited with status "%i"',61 kind.capitalize(), script, result.exit_status)62 @staticmethod63 def _job_to_environment_variables(job):64 env = {}65 env['AVOCADO_JOB_UNIQUE_ID'] = job.unique_id66 env['AVOCADO_JOB_STATUS'] = job.status67 if job.logdir is not None:68 env['AVOCADO_JOB_LOGDIR'] = job.logdir69 return env70 def pre(self, job):71 path = job.config.get('plugins.jobscripts.pre')72 self._run_scripts('pre', path, job)73 def post(self, job):74 path = job.config.get('plugins.jobscripts.post')...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful