Best Python code snippet using slash
workflow_processor.py
Source:workflow_processor.py  
1import sys2import json3import os.path4import traceback5import tempfile6import requests7from ruamel import yaml8# import PandaLogger before idds modules not to change message levels of other modules9from pandacommon.pandalogger.PandaLogger import PandaLogger10from pandacommon.pandalogger.LogWrapper import LogWrapper11from pandaserver.workflow import pcwl_utils12from pandaserver.workflow import workflow_utils13from pandaserver.srvcore.CoreUtils import commands_get_status_output, clean_user_id14from pandaserver.srvcore.MailUtils import MailUtils15from idds.client.clientmanager import ClientManager16from idds.common.utils import get_rest_host17_logger = PandaLogger().getLogger('workflow_processor')18# process workflow19class WorkflowProcessor(object):20    # constructor21    def __init__(self, task_buffer=None, log_stream=None):22        self.taskBuffer = task_buffer23        self.log = _logger24    # process a file25    def process(self, file_name, to_delete=False, test_mode=False, get_log=False, dump_workflow=False):26        try:27            is_fatal = False28            is_OK = True29            request_id = None30            dump_str = None31            with open(file_name) as f:32                ops = json.load(f)33                user_name = clean_user_id(ops["userName"])34                base_platform = ops['data'].get('base_platform')35                for task_type in ops['data']['taskParams']:36                    ops['data']['taskParams'][task_type]['userName'] = user_name37                    if base_platform:38                        ops['data']['taskParams'][task_type]['basePlatform'] = base_platform39                log_token = '< id="{}" test={} outDS={} >'.format(user_name, test_mode, ops['data']['outDS'])40                tmpLog = LogWrapper(self.log, log_token)41                tmpLog.info('start {}'.format(file_name))42                sandbox_url = os.path.join(ops['data']['sourceURL'], 'cache', ops['data']['sandbox'])43                # IO through json files44                ops_file = tempfile.NamedTemporaryFile(delete=False, mode='w')45                json.dump(ops, ops_file)46                ops_file.close()47                # execute main in another process to avoid chdir mess48                tmp_stat, tmp_out = commands_get_status_output("python {} {} '{}' {} {} '{}' {}".format(49                    __file__, sandbox_url, log_token, dump_workflow, ops_file.name,50                    user_name, test_mode))51                if tmp_stat:52                    is_OK = False53                    tmpLog.error('main execution failed with {}:{}'.format(tmp_stat, tmp_out))54                else:55                    with open(tmp_out.split('\n')[-1]) as tmp_out_file:56                        is_OK, is_fatal, request_id, dump_str = json.load(tmp_out_file)57                    try:58                        os.remove(tmp_out)59                    except Exception:60                        pass61                if not get_log:62                    if is_OK:63                        tmpLog.info('is_OK={} request_id={}'.format(is_OK, request_id))64                    else:65                        tmpLog.info('is_OK={} is_fatal={} request_id={}'.format(is_OK, is_fatal, request_id))66                if to_delete or (not test_mode and (is_OK or is_fatal)):67                    dump_str = tmpLog.dumpToString() + dump_str68                    tmpLog.debug('delete {}'.format(file_name))69                    try:70                        os.remove(file_name)71                    except Exception:72                        pass73                    # send notification74                    if not test_mode and self.taskBuffer is not None:75                        toAdder = self.taskBuffer.getEmailAddr(user_name)76                        if toAdder is None or toAdder.startswith('notsend'):77                            tmpLog.debug('skip to send notification since suppressed')78                        else:79                            # message80                            if is_OK:81                                mailSubject = "PANDA Notification for Workflow {}".format(ops['data']['outDS'])82                                mailBody = "Hello,\n\nWorkflow:{} has been accepted with RequestID:{}\n\n".\83                                    format(ops['data']['outDS'], request_id)84                            else:85                                mailSubject = "PANDA WARNING for Workflow={}".format(ops['data']['outDS'])86                                mailBody = "Hello,\n\nWorkflow {} was not accepted\n\n".\87                                    format(ops['data']['outDS'], request_id)88                                mailBody += "Reason : %s\n" % dump_str89                            # send90                            tmpSM = MailUtils().send(toAdder, mailSubject, mailBody)91                            tmpLog.debug('sent message with {}'.format(tmpSM))92        except Exception as e:93            is_OK = False94            tmpLog.error("failed to run with {} {}".format(str(e), traceback.format_exc()))95        if get_log:96            ret_val = {'status': is_OK}97            if is_OK:98                ret_val['log'] = dump_str99            else:100                if dump_str is None:101                    ret_val['log'] = tmpLog.dumpToString()102                else:103                    ret_val['log'] = dump_str104            return ret_val105# execute chdir in another process106def core_exec(sandbox_url, log_token, dump_workflow, ops_file, user_name, test_mode):107    tmpLog = LogWrapper(_logger, log_token)108    is_OK = True109    is_fatal = False110    request_id = None111    if dump_workflow == 'True':112        dump_workflow = True113    else:114        dump_workflow = False115    if test_mode == 'True':116        test_mode = True117    else:118        test_mode = False119    try:120        with open(ops_file) as f:121            ops = json.load(f)122        try:123            os.remove(ops_file)124        except Exception:125            pass126        # go to temp dir127        cur_dir = os.getcwd()128        with tempfile.TemporaryDirectory() as tmp_dirname:129            os.chdir(tmp_dirname)130            # download sandbox131            tmpLog.info('downloading sandbox from {}'.format(sandbox_url))132            with requests.get(sandbox_url, allow_redirects=True, verify=False, stream=True) as r:133                if r.status_code == 400:134                    tmpLog.error("not found")135                    is_fatal = True136                    is_OK = False137                elif r.status_code != 200:138                    tmpLog.error("bad HTTP response {}".format(r.status_code))139                    is_OK = False140                # extract sandbox141                if is_OK:142                    with open(ops['data']['sandbox'], 'wb') as fs:143                        for chunk in r.raw.stream(1024, decode_content=False):144                            if chunk:145                                fs.write(chunk)146                        fs.close()147                        tmp_stat, tmp_out = commands_get_status_output(148                            'tar xvfz {}'.format(ops['data']['sandbox']))149                        if tmp_stat != 0:150                            tmpLog.error(tmp_out)151                            dump_str = 'failed to extract {}'.format(ops['data']['sandbox'])152                            tmpLog.error(dump_str)153                            is_fatal = True154                            is_OK = False155                # parse workflow files156                if is_OK:157                    tmpLog.info('parse workflow')158                    if ops['data']['language'] == 'cwl':159                        nodes, root_in = pcwl_utils.parse_workflow_file(ops['data']['workflowSpecFile'],160                                                                        tmpLog)161                        with open(ops['data']['workflowInputFile']) as workflow_input:162                            data = yaml.safe_load(workflow_input)163                        s_id, t_nodes, nodes = pcwl_utils.resolve_nodes(nodes, root_in, data, 0, set(),164                                                                        ops['data']['outDS'], tmpLog)165                        workflow_utils.set_workflow_outputs(nodes)166                        id_node_map = workflow_utils.get_node_id_map(nodes)167                        [node.resolve_params(ops['data']['taskParams'], id_node_map) for node in nodes]168                        dump_str = "the description was internally converted as follows\n" \169                                   + workflow_utils.dump_nodes(nodes)170                        tmpLog.info(dump_str)171                        for node in nodes:172                            s_check, o_check = node.verify()173                            tmp_str = 'Verification failure in ID:{} {}'.format(node.id, o_check)174                            if not s_check:175                                tmpLog.error(tmp_str)176                                dump_str += tmp_str177                                dump_str += '\n'178                                is_fatal = True179                                is_OK = False180                    else:181                        dump_str = "{} is not supported to describe the workflow"182                        tmpLog.error(dump_str)183                        is_fatal = True184                        is_OK = False185                    # convert to workflow186                    if is_OK:187                        workflow_to_submit, dump_str_list = workflow_utils.convert_nodes_to_workflow(nodes)188                        try:189                            if workflow_to_submit:190                                if not test_mode:191                                    tmpLog.info('submit workflow')192                                    wm = ClientManager(host=get_rest_host())193                                    request_id = wm.submit(workflow_to_submit, username=user_name)194                            else:195                                dump_str = 'workflow is empty'196                                tmpLog.error(dump_str)197                                is_fatal = True198                                is_OK = False199                        except Exception as e:200                            dump_str = 'failed to submit the workflow with {}'.format(str(e))201                            tmpLog.error('{} {}'.format(dump_str, traceback.format_exc()))202                        if dump_workflow:203                            tmpLog.debug('\n' + ''.join(dump_str_list))204        os.chdir(cur_dir)205    except Exception as e:206        is_OK = False207        is_fatal = True208        tmpLog.error("failed to run with {} {}".format(str(e), traceback.format_exc()))209    with tempfile.NamedTemporaryFile(delete=False, mode='w') as tmp_json:210        json.dump([is_OK, is_fatal, request_id, tmpLog.dumpToString()], tmp_json)211        print(tmp_json.name)212    sys.exit(0)213if __name__ == "__main__":...setup.py
Source:setup.py  
1from distutils.core import setup, Extension2from os import path3from codecs import open4import os5import sys6is_fatal = False7'/usr/lib/jvm/jre-1.7.0/lib/amd64/server/'8# Both MapR and non-MapR environments require the libjvm.so file9libjvm_dir = os.environ.get('PYCHBASE_LIBJVM_DIR', None)10if libjvm_dir is None:11    sys.stderr.write('WARNING: $PYCHBASE_LIBJVM_DIR not set, trying $JAVA_HOME...\n')12    java_home = os.environ.get('JAVA_HOME', None)13    if java_home is None:14        sys.stderr.write("WARNING: $JAVA_HOME not set, trying '/usr/lib/jvm/jre-1.7.0/'\n")15        java_home = '/usr/lib/jvm/jre-1.7.0/'16    if os.path.isdir(java_home):17        if os.path.isdir(os.path.join(java_home, 'lib', 'amd64', 'server')):18            sys.stderr.write('WARNING: Located %s\n' % os.path.join(java_home, 'lib', 'amd64', 'server'))19            libjvm_dir = os.path.join(java_home, 'lib', 'amd64', 'server')20        elif os.path.isdir(os.path.join(java_home, 'jre', 'lib', 'amd64', 'server')):21            sys.stderr.write('WARNING: Located %s\n' % os.path.join(java_home, 'jre', 'lib', 'amd64', 'server'))22            libjvm_dir = os.path.join(java_home, 'jre', 'lib', 'amd64', 'server')23        else:24            sys.stderr.write("ERROR: Could not detect the directory of libjvm.so from $JAVA_HOME\n")25            is_fatal = True26    else:27        is_fatal = True28else:29    if not os.path.isdir(libjvm_dir):30        sys.stderr.write("ERROR: libjvm directory does not exist '%s' \n" % libjvm_dir)31        is_fatal = True32is_mapr = os.environ.get('PYCHBASE_IS_MAPR', None)33if is_mapr is None:34    sys.stderr.write('WARNING: $PYCHBASE_IS_MAPR not set, defaulting to True. This will not work on Non-MapR environments. \n\tPlease export $PYCHBASE_IS_MAPR=FALSE if this is on Cloudera/etc\n')35    is_mapr = 'TRUE'36else:37    if is_mapr.upper() == 'TRUE':38        is_mapr = True39    elif is_mapr.upper() == 'FALSE':40        is_mapr = False41    else:42        sys.stderr.write("WARNING: $PYCHBASE_IS_MAPR should be 'TRUE' or 'FALSE', not '%s', I am defaulting to TRUE\n" % is_mapr)43define_macros = []44if is_mapr:45    define_macros.append(('PYCHBASE_MAPR', '1'))46libraries = ['jvm']47library_dirs = [libjvm_dir]48if is_mapr:49    libraries.append('MapRClient')50    include_dir = os.environ.get('PYCHBASE_INCLUDE_DIR', None)51    if include_dir is None:52        sys.stderr.write("WARNING: $PYCHBASE_INCLUDE_DIR not set. I am defaulting to '/opt/mapr/include'\n")53        include_dir = '/opt/mapr/include'54    library_dir = os.environ.get('PYCHBASE_LIBRARY_DIR', None)55    if library_dir is None:56        sys.stderr.write("WARNING: $PYCHBASE_LIBRARY_DIR not set. I am defaulting to '/opt/mapr/lib'\n")57        library_dir = '/opt/mapr/lib'58    if not os.path.isdir(include_dir):59        sys.stderr.write("ERROR: $PYCHASE_INCLUDE_DIR '%s' does not exist.\n" % include_dir)60        is_fatal = True61    if not os.path.isdir(library_dir):62        sys.stderr.write("ERROR: $PYCHBASE_LIBRARY_DIR '%s' does not exist.\n" % library_dir)63        is_fatal = True64else:65    libraries.append('hbase')66    include_dir = os.environ.get('PYCHBASE_INCLUDE_DIR', None)67    if include_dir is None:68        sys.stderr.write("ERROR: Non-MapR environments must set the $PYCHBASE_INCLUDE_DIR environment variable.\n")69        is_fatal = True70    else:71        if not os.path.isdir(include_dir):72            sys.stderr.write("ERROR: $PYCHBASE_INCLUDE_DIR '%s' does not exist.\n" % include_dir)73            is_fatal = True74    library_dir = os.environ.get('PYCHBASE_LIBRARY_DIR', None)75    if library_dir is None:76        sys.stderr.write("ERROR: Non-MapR environments must set the $PYCHBASE_LIBRARY_DIR environment variable.\n")77        is_fatal = True78    else:79        if not os.path.isdir(library_dir):80            sys.stderr.write("ERROR: $PYCHBASE_LIBRARY_DIR does '%s' does not exist.\n" % library_dir)81            is_fatal = True82if is_fatal:83    sys.stderr.write('ERROR: Failed to install pychbase due to environment variables being set incorrectly. \n\tPlease read the warning/error messages above and consult the readme\n')84    raise ValueError("Failed to install. Please check the readme")85include_dirs = [include_dir]86library_dirs.append(library_dir)87print("include_dirs is %s" % include_dirs)88print("library_dirs is %s" % library_dirs)89print("libraries is %s" % libraries)90here = path.abspath(path.dirname(__file__))91long_description = 'A Python wrapper for the libhbase C API to HBase'92try:93    import requests94    def read_md(file_name):95        try:96            with open(file_name, 'r') as f:97                contents = f.read()98        except IOError as ex:99            return long_description100        try:101            r = requests.post(url='http://c.docverter.com/convert',102                              data={'to': 'rst', 'from': 'markdown'},103                              files={'input_files[]': contents})104            if not r.ok:105                raise Exception(r.text)106            return r.text107        except Exception as ex:108            print ex109            return open(file_name, 'r').read()110except ImportError:111    def read_md(file_name):112        try:113            with open(file_name, 'r') as f:114                print("requests module not available-- cannot convert MD to RST")115                return f.read()116        except IOError:117            return long_description118module1 = Extension('pychbase._pychbase',119                    sources=['pychbase.cc'],120                    include_dirs=include_dirs,121                    libraries=libraries,122                    library_dirs=library_dirs,123                    define_macros=define_macros)124setup(name='pychbase',125      version='0.1.8',126      description=long_description,127      long_description=read_md('README.md'),128      url='https://github.com/mkmoisen/pychbase',129      download_url='https://github.com/mkmoisen/pychbase/tarball/0.1',130      author='Matthew Moisen',131      author_email='mmoisen@cisco.com',132      license='MIT',133      classifiers=[134          'Development Status :: 3 - Alpha',135          'Intended Audience :: Developers',136          'License :: OSI Approved :: MIT License',137          'Programming Language :: Python :: 2.7',138          'Programming Language :: Python :: Implementation :: CPython',139          'Natural Language :: English',140          'Operating System :: POSIX :: Linux',141          'Topic :: Database',142      ],143      keywords='hbase libhbase',144      ext_modules=[module1],...exceptions.py
Source:exceptions.py  
2class SENinjaError(Exception):3    def __init__(self, msg):4        self.message = msg5        super().__init__(msg)6    def is_fatal(self):7        raise NotImplementedError  # override8class DivByZero(SENinjaError):9    def __init__(self, pc):10        self.pc = pc11        self.message = "division by zero occurred at 0x%x" % pc12        super().__init__(self.message)13    def is_fatal(self):14        return False15class UnmappedRead(SENinjaError):16    def __init__(self, pc):17        self.pc = pc18        self.message = "unmapped read at 0x%x" % pc19        super().__init__(self.message)20    def is_fatal(self):21        return False22class UnmappedWrite(SENinjaError):23    def __init__(self, pc):24        self.pc = pc25        self.message = "unmapped write at 0x%x" % pc26        super().__init__(self.message)27    def is_fatal(self):28        return False29class NoDestination(SENinjaError):30    def __init__(self):31        self.message = "no destination"32        super().__init__(self.message)33    def is_fatal(self):34        return False35class UnconstrainedIp(SENinjaError):36    def __init__(self):37        self.message = "unconstrained ip"38        super().__init__(self.message)39    def is_fatal(self):40        return False41class UnsatState(SENinjaError):42    def __init__(self, pc):43        self.pc = pc44        self.message = "UNSAT state at 0x%x"45        super().__init__(self.message)46    def is_fatal(self):47        return True48class ModelError(SENinjaError):49    def __init__(self, model_name, msg):50        self.model_name = model_name51        self.message = "%s: %s" % (model_name, msg)52        super().__init__(self.message)53    def is_fatal(self):54        return True55class UnimplementedInstruction(SENinjaError):56    def __init__(self, instr_name, ip):57        self.instr_name = instr_name58        self.message = "%s instruction is unimplemented @ %#x" % (instr_name, ip)59        super().__init__(self.message)60    def is_fatal(self):61        return True62class UnimplementedModel(SENinjaError):63    def __init__(self, f_name):64        self.f_name = f_name65        self.message = "unimplemented model for function %s" % f_name66        super().__init__(self.message)67    def is_fatal(self):68        return True69class UnimplementedSyscall(SENinjaError):70    def __init__(self, syscall_n):71        self.syscall_n = syscall_n72        self.message = "unimplemented syscall %d" % syscall_n73        super().__init__(self.message)74    def is_fatal(self):75        return True76class UnsupportedOs(SENinjaError):77    def __init__(self, platform_name):78        self.platform_name = platform_name79        self.message = "unsupported os %s" % platform_name80        super().__init__(self.message)81    def is_fatal(self):82        return True83class UnsupportedArch(SENinjaError):84    def __init__(self, arch_name):85        self.arch_name = arch_name86        self.message = "unsupported arch %s" % arch_name87        super().__init__(self.message)88    def is_fatal(self):89        return True90# *****91class SENinjaExeption(Exception):92    pass93class ExitException(SENinjaExeption):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
