Best Python code snippet using locust
setup.py
Source:setup.py  
...120		exit(1)121# Parses the scanned lines and outputs a configuration dictionary.122def parse(lines):123	index = 0124	def parse_error(msg):125		print("Error on line %d in configuration file: %s" % (lines[index][0], msg))126		exit(1)127	def check_error(msg):128		print("Error in configuration file: %s" % msg)129		exit(1)130	def scan_data(data_string):131		if data_string[0:6] == "repeat":132			contents = data_string[6:]133			if contents[0] != "(" and contents[-1] != ")":134				parse_error("Invalid format of repeat")135			params = contents[1:-1].split(",")136			if len(params) != 2:137				parse_error("Invalid number of params to repeat")138			try:139				value = int(params[0], 0)140				times = int(params[1], 0)141			except Exception:142				parse_error("Parameters to repeat must be integers")143			if value not in range(0, 256):144				parse_error("Value to be repeated must be between 0x00 and 0xFF")145			if times < 1:146				parse_error("Value must be repeated a positive number of times")147			values = Repeat(value, times)148		elif data_string[0] == "{" and data_string[-1] == "}":149			values = data_string[1:-1].split(",")150			if len(values) < 1 or values[0] == "":151				parse_error("setdata cannot have an empty array")152			try:153				values = [int(value, 0) for value in values]154				for value in values:155					assert value in range(0, 256)156			except Exception:157				parse_error("All values in array must be integers between 0x00 and 0xFF")158			values = values159		elif data_string[0] == "\"":160			string_data = ""161			in_string = False162			for i in range(len(data_string)):163				if in_string:164					if data_string[i] == "\"":165						in_string = False166					else:167						string_data += data_string[i]168				else:169					if data_string[i] == "\"":170						in_string = True171					elif not data_string[i].isspace():172						parse_error("characters between strings must be spaces")173			if in_string:174				parse_error("string must be closed by a double quotation mark")175			values = string_data176		else:177			parse_error("Invalid data format: %s" % data_string)178		return values179	# Setup configuration dictionary and define default commands180	config = dict()181	config["address"] = None182	config["mtu"] = None183	config["buffersize"] = 4096184	config["errorthreshold"] = 5185	config["commands"] = dict()186	config["commands"]["ACTIVE"] = Command("ACTIVE", 0x10)187	config["commands"]["SLEEP"] = Command("SLEEP", 0x11)188	config["commands"]["POWER_OFF"] = Command("POWER_OFF", 0x12)189	config["commands"]["REQ_PAYLOAD"] = Command("REQ_PAYLOAD", 0x20)190	config["commands"]["REQ_HK"] = Command("REQ_HK", 0x21)191	config["commands"]["REQ_PUS"] = Command("REQ_PUS", 0x22)192	config["commands"]["SEND_TIME"] = Command("SEND_TIME", 0x30)193	config["commands"]["SEND_PUS"] = Command("SEND_PUS", 0x31)194	for command in list(config["commands"].keys()):195		config["commands"][config["commands"][command].opcode] = config["commands"][command]196	config["sequence"] = dict()197	config["sequence"]["init"] = []198	config["sequence"]["loop"] = []199	# Scan options200	while index < len(lines):201		elems = lines[index][1].split()202		# If sequence appears, then we are done scanning the options203		if elems[0] == "sequence":204			break205		if elems[0] == "setaddress":206			if len(elems) != 2:207				parse_error("Wrong number of arguments to setaddress")208			try:209				addr = int(elems[1], 0)210				assert addr in range(0x00, 0x80)211			except Exception:212				parse_error("Address must be an integer between 0x00 and 0x7F")213			config["address"] = addr214		elif elems[0] == "setmtu":215			if len(elems) != 2:216				parse_error("Wrong number of arguments to setmtu")217			try:218				mtu = int(elems[1], 0)219				assert mtu > 0220			except Exception:221				parse_error("MTU must be a positive integer")222			config["mtu"] = mtu223		elif elems[0] == "setbuffersize":224			if len(elems) != 2:225				parse_error("Wrong number of arguments to setbuffersize")226			try:227				bufsize = int(elems[1], 0)228				assert bufsize > 0229			except Exception:230				parse_error("Buffer size must be a positive integer")231			config["buffersize"] = bufsize232		elif elems[0] == "seterrorthreshold":233			if len(elems) != 2:234				parse_error("Wrong number of arguments to seterrorthreshold")235			try:236				threshold = int(elems[1], 0)237				assert threshold > 0238			except Exception:239				parse_error("Error threshold must be a positive integer")240			config["errorthreshold"] = threshold241		elif elems[0] == "addcommand":242			if len(elems) != 3:243				parse_error("Wrong number of arguments to addcommand")244			name = elems[1]245			if name in config["commands"]:246				parse_error("Command \"%s\" already exists" % name)247			try:248				opcode = int(elems[2], 0)249				assert opcode in range(0x50, 0x80)250			except Exception:251				parse_error("Opcode must be a number between 0x50 and 0x7F.")252			if opcode in config["commands"]:253				parse_error("Command \"%s\" already has that opcode." % config["commands"][opcode].name)254			config["commands"][name] = Command(name, opcode)255			config["commands"][opcode] = config["commands"][name]256		elif elems[0] == "setdefaultdata":257			if len(elems) < 3:258				parse_error("Too few arguments to setdefaultdata")259			name = elems[1]260			if name not in config["commands"]:261				parse_error("Command \"%s\" not yet defined." % name)262			if not is_send(config["commands"][name].opcode):263				parse_error("Can only use setdefaultdata for send opcodes")264			265			data = lines[index][1]266			data = data[data.find(elems[0]) + len(elems[0]):]267			data = data[data.find(elems[1]) + len(elems[1]):]268			data = data.strip()269			config["commands"][name].data = scan_data(data)270		elif elems[0] == "setprintstyle":271			if len(elems) != 3:272				parse_error("Invalid number of arguments to setprintstyle")273			name = elems[1]274			if name not in config["commands"]:275				parse_error("Command \"%s\" not yet defined." % name)276			style = elems[2]277			if style not in ["none", "bytes", "bits", "string"]:278				parse_error("Invalid printstyle")279			config["commands"][name].printstyle = style280		else:281			parse_error("Invalid option \"%s\"" % elems[0])282		index += 1283	# Scan sequence284	seqtype = None285	while index < len(lines):286		elems = lines[index][1].split()287		if elems[0] == "sequence":288			if len(elems) != 2:289				parse_error("Invalid number of arguments to sequence")290			if elems[1] == seqtype:291				parse_error("sequence type declared twice")292			if elems[1] == "init" and seqtype is not None:293				parse_error("init must come before loop")294			seqtype = elems[1]295		elif elems[0] == "invoke":296			if len(elems) < 2:297				parse_error("Too few arguments to invoke")298			if seqtype is None:299				parse_error("Sequence type not yet defined")300			name = elems[1]301			if name not in config["commands"]:302				parse_error("Command \"%s\" not found" % name)303			if len(elems) > 2:304				data = lines[index][1]305				data = data[data.find(elems[0]) + len(elems[0]):]306				data = data[data.find(elems[1]) + len(elems[1]):]307				data = data.strip()308				values = scan_data(data)309				if not is_send(config["commands"][name].opcode):310					parse_error("Only send opcodes can have data associated with them")311				config["sequence"][seqtype].append(Invoke(config["commands"][name], values))312			else:313				if is_send(config["commands"][name].opcode) and config["commands"][name].data is None:314					parse_error("Send command \"%s\" has no default data and no data specified by the invoke" % name)315				config["sequence"][seqtype].append(Invoke(config["commands"][name]))316		elif elems[0] == "wait":317			if len(elems) != 2:318				parse_error("Invalid number of arguments to wait")319			if seqtype is None:320				parse_error("Sequence type not yet defined")321			try:322				millis = int(elems[1], 0)323				assert millis > 0324			except Exception:325				parse_error("argument to wait must be a non-negative integer")326			config["sequence"][seqtype].append(Wait(millis))327		else:328			parse_error("Invalid sequence action \"%s\"" % elems[0])329		index += 1330	# Sanity checks331	if config["address"] is None:332		check_error("setaddress must be among the options")333	if config["mtu"] is None:334		check_error("setmtu must be among the options")335	if config["buffersize"] < config["mtu"]:336		check_error("Buffer size must be larger than MTU. (buffersize = %d, mtu = %d)" % (config["buffersize"], config["mtu"]))337	if len(config["sequence"]["init"]) == 0 and len(config["sequence"]["loop"]) == 0:338		check_error("Both init and loop cannot be empty. At least one action must be present")339	# end of parse()340	return config341# Generates the header file with necessary constants342def generate_header(config):...test_parse_c_type.py
Source:test_parse_c_type.py  
...104        if isinstance(x, str):105            return x106        return '%d,%d' % (x & 255, x >> 8)107    return '  '.join(map(str_if_int, result))108def parse_error(input, expected_msg, expected_location):109    e = py.test.raises(ParseError, parse, input)110    assert e.value.args[0] == expected_msg111    assert e.value.args[1] == expected_location112def make_getter(name):113    opcode = getattr(lib, '_CFFI_OP_' + name)114    def getter(value):115        return opcode | (value << 8)116    return getter117Prim = make_getter('PRIMITIVE')118Pointer = make_getter('POINTER')119Array = make_getter('ARRAY')120OpenArray = make_getter('OPEN_ARRAY')121NoOp = make_getter('NOOP')122Func = make_getter('FUNCTION')123FuncEnd = make_getter('FUNCTION_END')124Struct = make_getter('STRUCT_UNION')125Enum = make_getter('ENUM')126Typename = make_getter('TYPENAME')127def test_simple():128    for simple_type, expected in [129            ("int", lib._CFFI_PRIM_INT),130            ("signed int", lib._CFFI_PRIM_INT),131            ("  long  ", lib._CFFI_PRIM_LONG),132            ("long int", lib._CFFI_PRIM_LONG),133            ("unsigned short", lib._CFFI_PRIM_USHORT),134            ("long double", lib._CFFI_PRIM_LONGDOUBLE),135            (" float  _Complex", lib._CFFI_PRIM_FLOATCOMPLEX),136            ("double _Complex ", lib._CFFI_PRIM_DOUBLECOMPLEX),137            ]:138        assert parse(simple_type) == ['->', Prim(expected)]139def test_array():140    assert parse("int[5]") == [Prim(lib._CFFI_PRIM_INT), '->', Array(0), 5]141    assert parse("int[]") == [Prim(lib._CFFI_PRIM_INT), '->', OpenArray(0)]142    assert parse("int[5][8]") == [Prim(lib._CFFI_PRIM_INT),143                                  '->', Array(3),144                                  5,145                                  Array(0),146                                  8]147    assert parse("int[][8]") == [Prim(lib._CFFI_PRIM_INT),148                                 '->', OpenArray(2),149                                 Array(0),150                                 8]151def test_pointer():152    assert parse("int*") == [Prim(lib._CFFI_PRIM_INT), '->', Pointer(0)]153    assert parse("int***") == [Prim(lib._CFFI_PRIM_INT),154                               Pointer(0), Pointer(1), '->', Pointer(2)]155def test_grouping():156    assert parse("int*[]") == [Prim(lib._CFFI_PRIM_INT),157                               Pointer(0), '->', OpenArray(1)]158    assert parse("int**[][8]") == [Prim(lib._CFFI_PRIM_INT),159                                   Pointer(0), Pointer(1),160                                   '->', OpenArray(4), Array(2), 8]161    assert parse("int(*)[]") == [Prim(lib._CFFI_PRIM_INT),162                                 NoOp(3), '->', Pointer(1), OpenArray(0)]163    assert parse("int(*)[][8]") == [Prim(lib._CFFI_PRIM_INT),164                                    NoOp(3), '->', Pointer(1),165                                    OpenArray(4), Array(0), 8]166    assert parse("int**(**)") == [Prim(lib._CFFI_PRIM_INT),167                                  Pointer(0), Pointer(1),168                                  NoOp(2), Pointer(3), '->', Pointer(4)]169    assert parse("int**(**)[]") == [Prim(lib._CFFI_PRIM_INT),170                                    Pointer(0), Pointer(1),171                                    NoOp(6), Pointer(3), '->', Pointer(4),172                                    OpenArray(2)]173def test_simple_function():174    assert parse("int()") == [Prim(lib._CFFI_PRIM_INT),175                              '->', Func(0), FuncEnd(0), 0]176    assert parse("int(int)") == [Prim(lib._CFFI_PRIM_INT),177                                 '->', Func(0), NoOp(4), FuncEnd(0),178                                 Prim(lib._CFFI_PRIM_INT)]179    assert parse("int(long, char)") == [180                                 Prim(lib._CFFI_PRIM_INT),181                                 '->', Func(0), NoOp(5), NoOp(6), FuncEnd(0),182                                 Prim(lib._CFFI_PRIM_LONG),183                                 Prim(lib._CFFI_PRIM_CHAR)]184    assert parse("int(int*)") == [Prim(lib._CFFI_PRIM_INT),185                                  '->', Func(0), NoOp(5), FuncEnd(0),186                                  Prim(lib._CFFI_PRIM_INT),187                                  Pointer(4)]188    assert parse("int*(void)") == [Prim(lib._CFFI_PRIM_INT),189                                   Pointer(0),190                                   '->', Func(1), FuncEnd(0), 0]191    assert parse("int(int, ...)") == [Prim(lib._CFFI_PRIM_INT),192                                      '->', Func(0), NoOp(5), FuncEnd(1), 0,193                                      Prim(lib._CFFI_PRIM_INT)]194def test_internal_function():195    assert parse("int(*)()") == [Prim(lib._CFFI_PRIM_INT),196                                 NoOp(3), '->', Pointer(1),197                                 Func(0), FuncEnd(0), 0]198    assert parse("int(*())[]") == [Prim(lib._CFFI_PRIM_INT),199                                   NoOp(6), Pointer(1),200                                   '->', Func(2), FuncEnd(0), 0,201                                   OpenArray(0)]202    assert parse("int(char(*)(long, short))") == [203        Prim(lib._CFFI_PRIM_INT),204        '->', Func(0), NoOp(6), FuncEnd(0),205        Prim(lib._CFFI_PRIM_CHAR),206        NoOp(7), Pointer(5),207        Func(4), NoOp(11), NoOp(12), FuncEnd(0),208        Prim(lib._CFFI_PRIM_LONG),209        Prim(lib._CFFI_PRIM_SHORT)]210def test_fix_arg_types():211    assert parse("int(char(long, short))") == [212        Prim(lib._CFFI_PRIM_INT),213        '->', Func(0), Pointer(5), FuncEnd(0),214        Prim(lib._CFFI_PRIM_CHAR),215        Func(4), NoOp(9), NoOp(10), FuncEnd(0),216        Prim(lib._CFFI_PRIM_LONG),217        Prim(lib._CFFI_PRIM_SHORT)]218    assert parse("int(char[])") == [219        Prim(lib._CFFI_PRIM_INT),220        '->', Func(0), Pointer(4), FuncEnd(0),221        Prim(lib._CFFI_PRIM_CHAR),222        OpenArray(4)]223def test_enum():224    for i in range(len(enum_names)):225        assert parse("enum %s" % (enum_names[i],)) == ['->', Enum(i)]226        assert parse("enum %s*" % (enum_names[i],)) == [Enum(i),227                                                        '->', Pointer(0)]228def test_error():229    parse_error("short short int", "'short' after another 'short' or 'long'", 6)230    parse_error("long long long", "'long long long' is too long", 10)231    parse_error("short long", "'long' after 'short'", 6)232    parse_error("signed unsigned int", "multiple 'signed' or 'unsigned'", 7)233    parse_error("unsigned signed int", "multiple 'signed' or 'unsigned'", 9)234    parse_error("long char", "invalid combination of types", 5)235    parse_error("short char", "invalid combination of types", 6)236    parse_error("signed void", "invalid combination of types", 7)237    parse_error("unsigned struct", "invalid combination of types", 9)238    #239    parse_error("", "identifier expected", 0)240    parse_error("]", "identifier expected", 0)241    parse_error("*", "identifier expected", 0)242    parse_error("int ]**", "unexpected symbol", 4)243    parse_error("char char", "unexpected symbol", 5)244    parse_error("int(int]", "expected ')'", 7)245    parse_error("int(*]", "expected ')'", 5)246    parse_error("int(]", "identifier expected", 4)247    parse_error("int[?]", "expected a positive integer constant", 4)248    parse_error("int[24)", "expected ']'", 6)249    parse_error("struct", "struct or union name expected", 6)250    parse_error("struct 24", "struct or union name expected", 7)251    parse_error("int[5](*)", "unexpected symbol", 6)252    parse_error("int a(*)", "identifier expected", 6)253    parse_error("int[123456789012345678901234567890]", "number too large", 4)254    #255    parse_error("_Complex", "identifier expected", 0)256    parse_error("int _Complex", "_Complex type combination unsupported", 4)257    parse_error("long double _Complex", "_Complex type combination unsupported",258                12)259def test_number_too_large():260    num_max = sys.maxsize261    assert parse("char[%d]" % num_max) == [Prim(lib._CFFI_PRIM_CHAR),262                                          '->', Array(0), num_max]263    parse_error("char[%d]" % (num_max + 1), "number too large", 5)264def test_complexity_limit():265    parse_error("int" + "[]" * 2500, "internal type complexity limit reached",266                202)267def test_struct():268    for i in range(len(struct_names)):269        if i == 3:270            tag = "union"271        else:272            tag = "struct"273        assert parse("%s %s" % (tag, struct_names[i])) == ['->', Struct(i)]274        assert parse("%s %s*" % (tag, struct_names[i])) == [Struct(i),275                                                            '->', Pointer(0)]276def test_exchanging_struct_union():277    parse_error("union %s" % (struct_names[0],),278                "wrong kind of tag: struct vs union", 6)279    parse_error("struct %s" % (struct_names[3],),280                "wrong kind of tag: struct vs union", 7)281def test_identifier():282    for i in range(len(identifier_names)):283        assert parse("%s" % (identifier_names[i])) == ['->', Typename(i)]284        assert parse("%s*" % (identifier_names[i])) == [Typename(i),285                                                        '->', Pointer(0)]286def test_cffi_opcode_sync():287    import cffi.model288    for name in dir(lib):289        if name.startswith('_CFFI_'):290            assert getattr(cffi_opcode, name[6:]) == getattr(lib, name)291    assert sorted(cffi_opcode.PRIMITIVE_TO_INDEX.keys()) == (292        sorted(cffi.model.PrimitiveType.ALL_PRIMITIVE_TYPES.keys()))293def test_array_length_from_constant():294    parse_error("int[UNKNOWN]", "expected a positive integer constant", 4)295    assert parse("int[FIVE]") == [Prim(lib._CFFI_PRIM_INT), '->', Array(0), 5]296    assert parse("int[ZERO]") == [Prim(lib._CFFI_PRIM_INT), '->', Array(0), 0]297    parse_error("int[NEG]", "expected a positive integer constant", 4)298def test_various_constant_exprs():299    def array(n):300        return [Prim(lib._CFFI_PRIM_CHAR), '->', Array(0), n]301    assert parse("char[21]") == array(21)302    assert parse("char[0x10]") == array(16)303    assert parse("char[0X21]") == array(33)304    assert parse("char[0Xb]") == array(11)305    assert parse("char[0x1C]") == array(0x1C)306    assert parse("char[0xc6]") == array(0xC6)307    assert parse("char[010]") == array(8)308    assert parse("char[021]") == array(17)309    parse_error("char[08]", "invalid number", 5)310    parse_error("char[1C]", "invalid number", 5)311    parse_error("char[0C]", "invalid number", 5)312    # not supported (really obscure):313    #    "char[+5]"314    #    "char['A']"315def test_stdcall_cdecl():316    assert parse("int __stdcall(int)") == [Prim(lib._CFFI_PRIM_INT),317                                           '->', Func(0), NoOp(4), FuncEnd(2),318                                           Prim(lib._CFFI_PRIM_INT)]319    assert parse("int __stdcall func(int)") == parse("int __stdcall(int)")320    assert parse("int (__stdcall *)()") == [Prim(lib._CFFI_PRIM_INT),321                                            NoOp(3), '->', Pointer(1),322                                            Func(0), FuncEnd(2), 0]323    assert parse("int (__stdcall *p)()") == parse("int (__stdcall*)()")324    parse_error("__stdcall int", "identifier expected", 0)325    parse_error("__cdecl int", "identifier expected", 0)326    parse_error("int __stdcall", "expected '('", 13)...__main__.py
Source:__main__.py  
1#!/usr/bin/env python3.72"""Main program for Python parser that outputs AST as Prolog term.3This is called by pykythe.pl, which further processes the AST.4"""5import argparse6import base647from dataclasses import dataclass8import hashlib9import logging10import sys11import traceback12from lib2to3 import pytree13from lib2to3.pgen2 import parse as pgen2_parse14from lib2to3.pgen2 import tokenize as pgen2_tokenize15from typing import Iterable, Optional, Text, Tuple, Union16from . import ast_node, ast_raw, ast_cooked, ast_color, pod17from .typing_debug import cast as xcast18# TODO: a bit more refactoring - look at error types, for example.19RawBaseType = Union[ast_raw.Node, ast_raw.Leaf]20def main() -> int:21    """Main (uses sys.argv)."""22    src_file: Optional[ast_node.File]23    parse_error: Optional['CompilationError']24    parse_tree: Optional[RawBaseType]25    with_fqns: Union['CompilationError', ast_cooked.Base]26    pykythe_logger = logging.getLogger('pykythe')27    pykythe_logger_hdlr = logging.StreamHandler()28    pykythe_logger_hdlr.setFormatter(  # TODO: use something emacs *compilation* recognizes29            logging.Formatter('LOG %(asctime)s,%(msecs)03d-%(name)s-%(levelname)s: %(message)s',30                              datefmt='%H:%M:%S'))31    pykythe_logger.addHandler(pykythe_logger_hdlr)32    pykythe_logger.setLevel(logging.WARNING)33    # TODO: add to ast_node.File: args.root, args.corpus (even though they're in Meta)34    args = _get_args()35    pykythe_logger.info('Start parsing %s', args.srcpath)36    src_file, parse_error = _make_file(args)37    if src_file:38        assert not parse_error39        parse_tree, parse_error = _parse_file(src_file, args)40        if parse_tree:41            assert not parse_error42            parse_tree, with_fqns, parse_error = _process_ast(src_file, parse_tree, args)43        else:44            assert parse_error45            with_fqns = parse_error46        # b64encode returns bytes, so use decode() to turn it into a47        # string, because json.dumps can't process bytes -- it48        # processes UTF-8 encoded strings.49        meta = ast_cooked.Meta(kythe_corpus=args.kythe_corpus,50                               kythe_root=args.kythe_root,51                               path=args.srcpath,52                               language='python',53                               contents_base64=base64.b64encode(src_file.contents_bytes),54                               contents_str=src_file.contents_str,55                               contents_bytes=src_file.contents_bytes,56                               sha1=hashlib.sha1(src_file.contents_bytes).hexdigest(),57                               encoding=src_file.encoding)58    else:59        assert parse_error60        parse_tree = None61        pykythe_logger.error('pykythe.__main__.main: Parse error: %s',62                             parse_error)  # DO NOT SUBMIT - error message form63        with_fqns = parse_error64        with open(args.srcpath, 'rb') as src_f:65            contents_bytes = xcast(bytes, src_f.read())66        meta = ast_cooked.Meta(67                kythe_corpus=args.kythe_corpus,68                kythe_root=args.kythe_root,69                path=args.srcpath,70                language='python',71                contents_base64=base64.b64encode(contents_bytes),72                contents_str='',  # TODO: .decode('iso-8859-1') or .decode('utf-8', 'surrogateescape')73                contents_bytes=contents_bytes,74                sha1=hashlib.sha1(b'').hexdigest(),75                encoding='ascii')76    colored = ast_color.ColorFile(src_file, parse_tree, dict(with_fqns.name_astns())).color()77    meta_str = meta.as_prolog_str() + '.'78    with_fqns_str = with_fqns.as_prolog_str() + '.'79    ast_color_str = ast_color.colored_list_as_prolog_str(colored) + '.'80    with open(args.out_fqn_ast, 'w') as out_fqn_ast_file:81        pykythe_logger.debug('Output fqn= %r', out_fqn_ast_file)82        print(meta_str, file=out_fqn_ast_file)83        print(with_fqns_str, file=out_fqn_ast_file)84        print(ast_color_str, file=out_fqn_ast_file)85    pykythe_logger.info('End parsing %s / meta:%d fqns:%d color:%d', args.srcpath, len(meta_str),86                        len(with_fqns_str), len(ast_color_str))87    return 088def _get_args() -> argparse.Namespace:89    parser = argparse.ArgumentParser(description='Parse Python file, generating Kythe facts')90    # TODO: allow nargs='+' for multiple inputs?91    parser.add_argument('--srcpath', required=True, help='Input file')92    parser.add_argument('--module', required=True, help='FQN of module corresponding to --src')93    parser.add_argument('--out_fqn_ast',94                        required=True,95                        help=('output file for AST (with FQNs) in Python term format. '96                              'These are post-processed to further resolve names.'))97    parser.add_argument('--kythe_corpus',98                        dest='kythe_corpus',99                        default='',100                        help='Value of "corpus" in Kythe facts')101    parser.add_argument('--kythe_root',102                        dest='kythe_root',103                        default='',104                        help='Value of "root" in Kythe facts')105    parser.add_argument(  # TODO: version should be a triple -- see fakesys.FAKE_SYS106            '--python_version',107            default=3,108            choices=[2, 3],109            type=int,110            help='Python major version')111    return parser.parse_args()112def _make_file(113        args: argparse.Namespace) -> Tuple[Optional[ast_node.File], Optional['CompilationError']]:114    parse_error: Optional['CompilationError']115    src_file: Optional[ast_node.File] = None116    try:117        src_file = ast_node.make_file(path=args.srcpath)118        parse_error = None119    except SyntaxError as exc:120        # TODO: This seems to sometimes be raised from an encoding error with121        #       msg='unknown encoding: ...', exc.lineno=None, exc.offset=None122        parse_error = ParseError(123                msg=str(exc),124                type='',125                context=str(('', (exc.lineno, exc.offset))) if exc.lineno or exc.offset else '',126                value=exc.text or '',127                srcpath=args.srcpath)128    except UnicodeDecodeError as exc:129        src_file = None130        # exc.object is the entire string, which can be determined131        # from the file132        parse_error = DecodeError(encoding=exc.encoding,133                                  start=exc.start,134                                  end=exc.end,135                                  reason=exc.reason,136                                  srcpath=args.srcpath)137    return src_file, parse_error138def _parse_file(src_file: ast_node.File,139                args: argparse.Namespace) -> Tuple[RawBaseType, Optional['CompilationError']]:140    parse_error: Optional['CompilationError'] = None141    parse_tree: Optional[RawBaseType] = None142    try:143        parse_tree = ast_raw.parse(src_file, args.python_version)144        parse_error = None145    except pgen2_tokenize.TokenError as exc:146        parse_error = ParseError(msg=str(exc), type='', context='', value='', srcpath=args.srcpath)147    except SyntaxError as exc:148        # TODO: This seems to sometimes be raised from an encoding error with149        #       msg='unknown encoding: ...', exc.lineno=None, exc.offset=None150        parse_error = ParseError(151                msg=str(exc),152                type='',153                context=str(('', (exc.lineno, exc.offset))) if exc.lineno or exc.offset else '',154                value=exc.text or '',155                srcpath=args.srcpath)156    except UnicodeDecodeError as exc:157        parse_error = DecodeError(encoding=exc.encoding,158                                  start=exc.start,159                                  end=exc.end,160                                  reason=exc.reason,161                                  srcpath=args.srcpath)162    except pgen2_parse.ParseError as exc:163        parse_error = ParseError(msg=str(exc),164                                 type=pytree.type_repr(exc.type),165                                 value=exc.value or '',166                                 context=str(exc.context),167                                 srcpath=args.srcpath)168    return parse_tree, parse_error169def _process_ast(170        src_file: ast_node.File, parse_tree: RawBaseType, args: argparse.Namespace171) -> Tuple[Optional[RawBaseType], Union['Crash', 'ParseError', ast_cooked.Base],172           Optional['ParseError']]:173    logging.getLogger('pykythe').debug('RAW= %r', parse_tree)174    new_parse_tree: Optional[RawBaseType]175    with_fqns: Union[ast_cooked.Base, 'ParseError', 'Crash']176    parse_error: Optional[Union['ParseError', Exception]]177    try:178        cooked_nodes = ast_raw.cvt_parse_tree(parse_tree, args.python_version, src_file)179        with_fqns = ast_cooked.add_fqns(cooked_nodes, args.module, args.python_version)180        parse_error = None181        new_parse_tree = parse_tree182    except pgen2_parse.ParseError as exc:183        parse_error = ParseError(msg=str(exc),184                                 type=pytree.type_repr(exc.type),185                                 value=exc.value or '',186                                 context=str(exc.context),187                                 srcpath=args.srcpath)188        # This can happen with, e.g. function def struct unpacking189        new_parse_tree = None190        with_fqns = parse_error191        logging.getLogger('pykythe').error('pykythe.__main__._process_ast: Parse error: %s',192                                           parse_error)  # DO NOT SUBMIT - error message form193    except Exception as exc:  # pylint: disable=broad-except194        # DO NOT SUBMIT: the parse_error assignment is probably incorrect195        parse_error = exc  # TODO: is this correct? we get this from an assertion check, for example196        new_parse_tree = parse_tree197        logging.getLogger('pykythe').error(198                'pykythe.__main__._process_ast: Caught error in cvt_parse_tree/with_fqns: %s %r',199                exc, exc)200        traceback.print_exc()201        with_fqns = Crash(str=str(exc), repr=repr(exc), srcpath=args.srcpath)202    return new_parse_tree, with_fqns, parse_error203class CompilationError(pod.PlainOldDataExtended):204    """Base error class that defines do-nothing pre_order, name_astns."""205    def pre_order(self) -> Iterable[pytree.Base]:  # pylint: disable=no-self-use206        yield from ()207    def name_astns(self) -> Iterable[Tuple[ast_node.Astn, str]]:  # pylint: disable=no-self-use208        """Do-nothing version of ast_cooked.Base.name_astns()."""209        yield from ()210@dataclass(frozen=True)211class DecodeError(CompilationError):212    """Encapsulate UnicodeDecodeError."""213    encoding: str214    start: int215    end: int216    reason: str217    srcpath: str218    __slots__ = ['encoding', 'start', 'end', 'reason', 'srcpath']219@dataclass(frozen=True)220class ParseError(CompilationError):221    """Encapsulate parse.ParseError."""222    msg: str223    type: str  # pytree.type_repr(type)224    value: str225    context: str  # str(context)226    srcpath: str227    __slots__ = ['msg', 'type', 'value', 'context', 'srcpath']228@dataclass(frozen=True)229class Crash(CompilationError):230    """Encapsulate general exeception."""231    str: Text232    repr: Text233    srcpath: Text234    __slots__ = ['str', 'repr', 'srcpath']235if __name__ == '__main__':236    if sys.version_info < (3, 6):237        raise RuntimeError(f'Version must be 3.6 or later: {sys.version_info}')238    sys.setrecursionlimit(2000)  # default is 1000; more is needed for pod._as_prolog_str()...authentication.py
Source:authentication.py  
...25            self.auth.changePassword(username, password)26            self.has_error = False27            return "Password changed."28        except AuthenticationException, e:29            self.error_message = self.parse_error(e)30    def check_login(self):31        action = self.formData.get("verb")32        # User is logging in33        if (action == "login"):34            username = self.formData.get("username")35            if username is not None:36                password = self.formData.get("password")37                self.login(username, password)38        # Normal page render, or logout39        else:40            username = self.sessionState.get("username")41            source   = self.sessionState.get("source")42            if username is not None:43                self.current_user = self.get_user(username, source)44        # User is logging out, make sure we ran get_user() first45        if (action == "logout"):46            self.logout()47    def create_role(self, rolename):48        try:49            self.role.createRole(rolename)50            self.has_error = False51            return username # TODO Where does this come from?52        except RolesException, e:53            self.error_message = self.parse_error(e)54    def create_user(self, username, password):55        try:56            self.auth.createUser(username, password)57            self.has_error = False58            return username59        except AuthenticationException, e:60            self.error_message = self.parse_error(e)61    def delete_role(self, rolename):62        try:63            self.role.deleteRole(rolename)64            self.has_error = False65            return rolename66        except RolesException, e:67            self.error_message = self.parse_error(e)68    def delete_user(self, username):69        try:70            self.auth.deleteUser(username)71            self.has_error = False72            return username73        except AuthenticationException, e:74            self.error_message = self.parse_error(e)75    def get_error(self):76        if self.has_error:77            return self.error_message78        else:79            return None80    def get_name(self):81        if self.current_user is not None:82            return self.current_user.realName()83        else:84            return "Guest";85    def get_plugins_access(self):86        return self.access.getPluginList()87    def get_plugins_auth(self):88        return self.auth.getPluginList()89    def get_plugins_roles(self):90        return self.role.getPluginList()91    def get_roles(self):92        my_roles = self.get_roles_list()93        length = len(my_roles)94        if length == 0:95            return ""96        elif length > 0:97            response = my_roles[0]98        if length > 1:99            for role in my_roles[1:]:100                response = response + ", " + role101        return response102    def get_roles_list(self):103        # Use the cache if we can104        if self.roleList is None:105            role_list = []106            try:107                if self.current_user is not None:108                    # Otherwise retrieve from the plugin109                    java_list = self.security.getRolesList(self.sessionState, self.current_user)110                    if java_list is not None:111                        for role in java_list:112                            role_list.append(role)113            except Exception, e:114                self.error_message = self.parse_error(e)115            role_list.append(self.GUEST_ROLE)116            self.roleList = role_list117        return self.roleList118    119    def get_permissions_list(self):120        # Use the cache if we can121        if self.permList is None:122            perm_list = []123            try:124                if self.current_user is not None:125                    # Otherwise retrieve from the plugin126                    java_list = self.security.getFilePermissions(self.sessionState, self.current_user)127                    if java_list is not None:128                        for group in java_list:129                            perm_list.append(group)130            except Exception, e:131                self.error_message = self.parse_error(e)132            #perm_list.append(self.GUEST_ROLE)133            self.permList = perm_list134            135        return self.permList136    def get_access_roles_list(self, recordId):137        try:138            roles_list = {}139            plugins = self.access.getPluginList()140            for plugin in plugins:141                self.access.setActivePlugin(plugin.getId())142                result = self.access.getSchemas(recordId)143                roles = []144                for role in result:145                    roles.append(role.get("role"))146                roles_list[plugin.getId()] = roles147            return roles_list148        except AccessControlException, e:149            self.error_message = self.parse_error(e)150    def get_user(self, username, source):151        try:152            self.active_auth_plugin = source153            user = self.security.getUser(self.sessionState, username, source)154            self.has_error = False155            return user156        except AuthenticationException, e:157            self.error_message = self.parse_error(e)158    def get_username(self):159        if self.current_user is None:160            return self.GUEST_USER161        user = self.current_user.getUsername()162        if user is None:163            return self.GUEST_USER164        else:165            return user166    def get_user_source(self):167        if self.current_user is None:168            return self.GUEST_ROLE169        source = self.current_user.getSource()170        if source is None:171            return self.GUEST_ROLE172        else:173            return source174    def grant_access(self, recordId, newRole):175        try:176            newAccess = self.access.getEmptySchema()177            newAccess.init(recordId)178            newAccess.set("role", newRole)179            self.access.applySchema(newAccess)180            self.has_error = False181        except AccessControlException, e:182            self.error_message = self.parse_error(e)183    def has_role(self, role):184        if self.current_user is not None:185            my_roles = self.get_roles_list()186            if role in my_roles:187                return True188            else:189                return False190        else:191            return False192    def is_admin(self):193        return self.has_role("admin")194    def is_logged_in(self):195        if self.current_user is not None:196            return True197        else:198            return False199    def list_users(self, rolename):200        try:201            return self.role.getUsersInRole(rolename)202        except RolesException, e:203            self.error_message = self.parse_error(e)204    def login(self, username, password):205        try:206            self.current_user = self.auth.logIn(username, password)207            self.error_message = None208            self.sessionState.set("username", username)209            self.sessionState.set("source",   self.current_user.getSource())210            self.has_error = False211        except AuthenticationException, e:212            self.current_user  = None213            self.error_message = self.parse_error(e)214    def logout(self):215        if self.current_user is not None:216            try:217                self.security.logout(self.sessionState, self.current_user)218                self.current_user = None219                self.error_message = None220                self.has_error = False221            except AuthenticationException, e:222                self.error_message = self.parse_error(e)223    # Strip out java package names from224    # error strings.225    def parse_error(self, error):226        self.has_error = True227        message = error.getMessage()228        i = message.find(":")229        if i != -1:230            return message[i+1:].strip()231        else:232            return message.strip()233    def remove_role(self, username, rolename):234        try:235            self.role.removeRole(username, rolename)236            self.has_error = False237        except RolesException, e:238            self.error_message = self.parse_error(e)239    def revoke_access(self, recordId, newRole):240        try:241            oldAccess = self.access.getEmptySchema()242            oldAccess.init(recordId)243            oldAccess.set("role", newRole)244            self.access.removeSchema(oldAccess)245            self.has_error = False246        except AccessControlException, e:247            self.error_message = self.parse_error(e)248    def search_roles(self, query, source):249        try:250            self.active_role_plugin = source251            self.role.setActivePlugin(source)252            roles = self.role.searchRoles(query)253            self.has_error = False254            return roles255        except RolesException, e:256            self.error_message = self.parse_error(e)257    def search_users(self, query, source):258        try:259            self.active_auth_plugin = source260            self.auth.setActivePlugin(source)261            users = self.auth.searchUsers(query)262            self.has_error = False263            return users264        except AuthenticationException, e:265            self.error_message = self.parse_error(e)266    def set_access_plugin(self, plugin_id):267        try:268            self.active_access_plugin = plugin_id269            self.access.setActivePlugin(plugin_id)270            self.has_error = False271        except AccessControlException, e:272            self.error_message = self.parse_error(e)273    def set_auth_plugin(self, plugin_id):274        try:275            self.active_auth_plugin = plugin_id276            self.auth.setActivePlugin(plugin_id)277            self.has_error = False278        except AuthenticationException, e:279            self.error_message = self.parse_error(e)280    def set_role(self, username, rolename):281        try:282            self.role.setRole(username, rolename)283            self.has_error = False284        except RolesException, e:285            self.error_message = self.parse_error(e)286    def set_role_plugin(self, plugin_id):287        try:288            self.active_role_plugin = plugin_id289            self.role.setActivePlugin(plugin_id)290            self.has_error = False291        except RolesException, e:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
