How to use process_key method in keyboard

Best Python code snippet using keyboard

subprocess_with_logger.py

Source:subprocess_with_logger.py Github

copy

Full Screen

1#!/usr/bin/env python2##############################################################################3# Copyright 2017-present, Facebook, Inc.4# All rights reserved.5#6# This source code is licensed under the license found in the7# LICENSE file in the root directory of this source tree.8##############################################################################9from __future__ import absolute_import10from __future__ import division11from __future__ import print_function12from __future__ import unicode_literals13import os14import select15import signal16import subprocess17import sys18import time19from threading import Timer20from .custom_logger import getLogger21from .utilities import (22 setRunStatus,23 getRunStatus,24 setRunTimeout,25 getRunTimeout,26 getRunKilled,27)28def processRun(*args, **kwargs):29 if "process_key" not in kwargs:30 kwargs["process_key"] = ""31 retryCount = 332 if "retry" in kwargs:33 retryCount = kwargs["retry"]34 while retryCount > 0:35 # reset run status overwritting error36 # from prior run37 setRunStatus(0, overwrite=True, key=kwargs["process_key"])38 sleep = kwargs.get("retry_sleep")39 if sleep:40 getLogger().info("Sleeping for {}".format(sleep))41 time.sleep(sleep)42 ret = _processRun(*args, **kwargs)43 # break out if the run succeeded44 if getRunStatus(key=kwargs["process_key"]) == 0:45 if not kwargs.get("silent", False):46 getLogger().info("Process Succeeded: %s", " ".join(*args))47 break48 # don't retry for errors which we know will49 # fail again (ie. timeouts)50 if getRunTimeout():51 getLogger().info("Process Failed: %s", " ".join(*args))52 break53 retryCount -= 154 if retryCount > 0:55 getLogger().info(56 f"Process Failed (will retry {retryCount} more times): {' '.join(*args)}"57 )58 return ret59def _processRun(*args, **kwargs):60 if not kwargs.get("silent", False):61 getLogger().info(">>>>>> Running: %s", " ".join(*args))62 err_output = None63 try:64 run_async = False65 if "async" in kwargs:66 run_async = kwargs["async"]67 non_blocking = False68 if "non_blocking" in kwargs and kwargs["non_blocking"]:69 non_blocking = True70 if non_blocking:71 _Popen(*args, **kwargs)72 return [], None73 timeout = None74 if "timeout" in kwargs:75 timeout = kwargs["timeout"]76 ps = _Popen(*args, **kwargs)77 t = None78 if timeout:79 t = Timer(timeout, _kill, [ps, " ".join(*args), kwargs["process_key"]])80 t.start()81 if run_async:82 # when running the process asyncronously we return the83 # popen object and timer for the timeout as a tuple84 # it is the responsibility of the caller to pass this85 # tuple into processWait in order to gather the output86 # from the process87 return (ps, t), None88 return processWait((ps, t), **kwargs)89 except subprocess.CalledProcessError as e:90 err_output = e.output.decode("utf-8", errors="replace")91 getLogger().error("Command failed: {}".format(err_output))92 except Exception:93 getLogger().error(94 "Unknown exception {}: {}".format(sys.exc_info()[0], " ".join(*args))95 )96 err_output = "{}".format(sys.exc_info()[0])97 setRunStatus(1, key=kwargs["process_key"])98 return [], err_output99def processWait(processAndTimeout, **kwargs):100 try:101 ps, t = processAndTimeout102 process_key = ""103 if "process_key" in kwargs:104 process_key = kwargs["process_key"]105 log_output = False106 if "log_output" in kwargs:107 log_output = kwargs["log_output"]108 ignore_status = False109 if "ignore_status" in kwargs:110 ignore_status = kwargs["ignore_status"]111 patterns = []112 if "patterns" in kwargs:113 patterns = kwargs["patterns"]114 output, match = _getOutput(ps, patterns, process_key=process_key)115 ps.stdout.close()116 if match:117 # if the process is terminated by mathing output,118 # assume the process is executed successfully119 ps.terminate()120 status = 0121 else:122 # wait for the process to terminate or for a kill request123 while not getRunKilled():124 try:125 status = ps.wait(timeout=15.0)126 break127 except subprocess.TimeoutExpired:128 pass129 # check if we exitted loop due to a timeout request130 if getRunKilled():131 getLogger().info("Process was killed at user request")132 ps.terminate()133 status = 0134 if t is not None:135 t.cancel()136 if log_output or (status != 0 and not ignore_status):137 if status != 0 and not ignore_status:138 getLogger().info("Process exited with status: {}".format(status))139 setRunStatus(1, key=process_key)140 if "filter" in kwargs:141 output = _filterOutput(output, kwargs["filter"])142 getLogger().info(143 "\n\nProgram Output:\n{}\n{}\n{}\n".format(144 "=" * 80, "\n".join(output), "=" * 80145 )146 )147 if status == 0 or ignore_status:148 setRunStatus(0, key=process_key)149 return output, None150 else:151 setRunStatus(1, key=process_key)152 return [], "\n".join(output)153 except subprocess.CalledProcessError as e:154 err_output = e.output.decode("utf-8", errors="replace")155 getLogger().error("Command failed: {}".format(err_output))156 except Exception:157 err_output = "{}".format(sys.exc_info()[0])158 getLogger().error("Unknown exception {}".format(sys.exc_info()[0]))159 return [], err_output160def _filterOutput(output, match_list):161 length = len(output)162 for i, line in enumerate(output[::-1]):163 for match in match_list:164 if match in line:165 del output[length - i - 1]166 break167 return output168def _Popen(*args, **kwargs):169 # only allow allowlisted args to be passed into popen170 customArgs = {}171 allowlist = ["env"]172 for arg in allowlist:173 if arg in kwargs:174 customArgs[arg] = kwargs[arg]175 ps = subprocess.Popen(176 *args,177 bufsize=-1,178 stdout=subprocess.PIPE,179 stderr=subprocess.STDOUT,180 universal_newlines=True,181 preexec_fn=os.setsid,182 errors="replace",183 **customArgs,184 )185 # We set the buffer size to system default.186 # this is not really recommended. However, we need to stream the187 # output as they are available. So we do this. But, if the data188 # comes in too fast and there is no time to consume them, the output189 # may be truncated. Now, add a buffer to reduce the problem.190 # will see whether this is indeed an issue later on.191 return ps192def _getOutput(ps, patterns, process_key=""):193 if not isinstance(patterns, list):194 patterns = [patterns]195 poller = select.poll()196 poller.register(ps.stdout)197 lines = []198 match = False199 while not getRunKilled(process_key):200 # Try to get output from binary if possible201 # If not possible then loop202 # and recheck run killed contidion203 if poller.poll(15.0):204 line = ps.stdout.readline()205 else:206 continue207 if not line:208 break209 nline = line.rstrip()210 try:211 # decode the string if decode exists212 decoded_line = nline.decode("utf-8", errors="replace")213 nline = decoded_line214 except Exception:215 pass216 lines.append(nline)217 for pattern in patterns:218 if pattern.match(nline):219 match = True220 break221 if match:222 break223 return lines, match224def _kill(p, cmd, processKey):225 try:226 os.killpg(p.pid, signal.SIGKILL)227 except OSError:228 pass # ignore229 getLogger().error("Process timed out: {}".format(cmd))230 setRunStatus(1, key=processKey)...

Full Screen

Full Screen

config.py

Source:config.py Github

copy

Full Screen

...7logger = wingspan_logger.get_logger()8class Config:9 config = None10 @classmethod11 def process_key(cls, key, config_dict, data_type):12 if data_type == 'str':13 if os.environ.get(key):14 return os.environ[key]15 return config_dict.get(key, '')16 elif data_type == 'int':17 if os.environ.get(key):18 return int(os.environ[key])19 if config_dict.get(key):20 return int(config_dict[key])21 return 022 elif data_type == 'bool':23 if os.environ.get(key):24 return os.environ.get(key).lower() == "true"25 if config_dict.get(key):26 return config_dict.get(key)27 @classmethod28 def set_config(cls):29 if cls.config is None:30 # comment out below line if you don't have pyfiglet31 print(pyfiglet.figlet_format('W I N G S P A N'), flush=True)32 config_dict = {}33 # config file will not be passed in sys.argv.34 # Store it in root(or any location in your local system) and provide the path above.35 # Change your arg indices that was reading config.json accordingly.36 try:37 with open(r"config.json") as json_data:38 config_dict = json.load(json_data)39 except Exception as e:40 logger.info("Config file not read. Error : " + str(e))41 config_dict['input_dir'] = cls.process_key('input_dir', config_dict, 'str')42 config_dict['output_dir'] = cls.process_key('output_dir', config_dict, 'str')43 config_dict['extract_dir'] = cls.process_key('extract_dir', config_dict, 'str')44 config_dict['download_dir'] = cls.process_key('download_dir', config_dict, 'str')45 config_dict['environment'] = cls.process_key('environment', config_dict, 'str')46 config_dict['encryption_secret'] = cls.process_key('encryption_secret', config_dict, 'str')47 48 config_dict['elasticsearch_url'] = cls.process_key('elasticsearch_url', config_dict, 'str')49 config_dict['elasticsearch_ssl_enabled'] = cls.process_key('elasticsearch_ssl_enabled', config_dict, 'bool')50 51 config_dict['cassandra_host'] = []52 config_dict['cassandra_host'].append(cls.process_key('cassandra_ip', config_dict, 'str'))53 config_dict['cassandra_port'] = cls.process_key('cassandra_port', config_dict, 'int')54 config_dict['cassandra_user'] = cls.process_key('cassandra_user', config_dict, 'str')55 config_dict['cassandra_password'] = cls.process_key('cassandra_password', config_dict, 'str')56 config_dict['postgres_general_host'] = cls.process_key('postgres_general_host', config_dict, 'str')57 config_dict['postgres_general_port'] = cls.process_key('postgres_general_port', config_dict, 'int')58 config_dict['postgres_general_user'] = cls.process_key('postgres_general_user', config_dict, 'str')59 config_dict['postgres_general_password'] = cls.process_key('postgres_general_password', config_dict, 'str')60 61 config_dict['postgres_schedulo_host'] = cls.process_key('postgres_schedulo_host', config_dict, 'str')62 config_dict['postgres_schedulo_port'] = cls.process_key('postgres_schedulo_port', config_dict, 'int')63 config_dict['postgres_schedulo_user'] = cls.process_key('postgres_schedulo_user', config_dict, 'str')64 config_dict['postgres_schedulo_password'] = cls.process_key('postgres_schedulo_password', config_dict,65 'str')66 config_dict['postgres_critical_host'] = cls.process_key('postgres_critical_host', config_dict, 'str')67 config_dict['postgres_critical_port'] = cls.process_key('postgres_critical_port', config_dict, 'int')68 config_dict['postgres_critical_user'] = cls.process_key('postgres_critical_user', config_dict, 'str')69 config_dict['postgres_critical_password'] = cls.process_key('postgres_critical_password', config_dict,70 'str')71 config_dict['postgres_pid_host'] = cls.process_key('postgres_pid_host', config_dict, 'str')72 config_dict['postgres_pid_port'] = cls.process_key('postgres_pid_port', config_dict, 'int')73 config_dict['postgres_pid_user'] = cls.process_key('postgres_pid_user', config_dict, 'str')74 config_dict['postgres_pid_password'] = cls.process_key('postgres_pid_password', config_dict, 'str')75 76 config_dict['neo4j_host'] = cls.process_key('neo4j_host', config_dict, 'str')77 config_dict['neo4j_user'] = cls.process_key('neo4j_user', config_dict, 'str')78 config_dict['neo4j_password'] = cls.process_key('neo4j_password', config_dict, 'str')79 80 config_dict['la_external_service'] = cls.process_key('la_external_service', config_dict, 'str')81 config_dict['la_client_id'] = cls.process_key('la_client_id', config_dict, 'str')82 config_dict['la_client_password'] = cls.process_key('la_client_password', config_dict, 'str')83 84 config_dict['s3_analytics'] = cls.process_key('s3_analytics', config_dict, 'str')85 86 config_dict['gateway_auth_url'] = cls.process_key('gateway_auth_url', config_dict, 'str')87 config_dict['gateway_url'] = cls.process_key('gateway_url', config_dict, 'str')88 89 config_dict['oracle_servers'] = cls.process_key('oracle_servers', config_dict, 'str')90 for oracle_server_name in config_dict['oracle_servers'].split(','):91 config_dict['oracle_' + oracle_server_name + '_user'] = cls.process_key(92 'oracle_' + oracle_server_name + '_user', config_dict, 'str')93 config_dict['oracle_' + oracle_server_name + '_password'] = cls.process_key(94 'oracle_' + oracle_server_name + '_password', config_dict, 'str')95 config_dict['oracle_' + oracle_server_name + '_host'] = cls.process_key(96 'oracle_' + oracle_server_name + '_host', config_dict, 'str')97 cls.config = config_dict98 @classmethod99 def get_config(cls):100 cls.set_config()...

Full Screen

Full Screen

ProcessRepository.py

Source:ProcessRepository.py Github

copy

Full Screen

...19 raise MissingOptionError(f'missing option please provide options {PROCESS_KEY}')20 if PROCESS_KEY not in self.options:21 self.log.warning(f'missing option please provide option {PROCESS_KEY}')22 raise MissingOptionError(f'missing option please provide option {PROCESS_KEY}')23 def build_process_key(self, market, process_name):24 return self.process_key.format(market, process_name)25 def store(self, process: Process):26 key = self.build_process_key(process.market, process.name)27 serialized = serialize_process(process)28 self.cache.store(key, serialized)29 def retrieve(self, process_name, market) -> Process:30 key = self.build_process_key(market, process_name)31 return self.__retrieve(key)32 def __retrieve(self, key):33 raw_entity = self.cache.fetch(key, as_type=dict)34 return deserialize_process(raw_entity)35 def retrieve_all(self, market) -> List[Process]:36 all_process_keys = self.process_key.format(market, '*')37 keys = self.cache.get_keys(all_process_keys)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run keyboard automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful