Best Python code snippet using lisa_python
workflow.py
Source:workflow.py  
...184        file_output = [_nest_variable(x) for x in file_output]185        std_output = [_nest_variable(x) for x in std_output]186    # For combined output, ensure at the same level (not nested) as main samples187    if step.parallel in ["multi-combined"]:188        file_vs = _merge_variables([_clean_output(_flatten_nested_input(v) if not is_cwl_record(v) else v)189                                    for v in file_output], file_vs)190    else:191        file_vs = _merge_variables([_clean_output(v) for v in file_output], file_vs)192    std_vs = _merge_variables([_clean_output(v) for v in std_output], std_vs)193    return file_output + std_output, file_vs, std_vs194def _flatten_nested_input(v):195    """Flatten a parallel scatter input -- we only get one of them to tools.196    """197    v = copy.deepcopy(v)198    assert v["type"]["type"] == "array"199    v["type"] = v["type"]["items"]200    return v201def _nest_variable(v):202    """Nest a variable when moving from scattered back to consolidated.203    """204    v = copy.deepcopy(v)205    v["type"] = {"type": "array", "items": v["type"]}206    return v207def _clean_output(v):208    """Remove output specific variables to allow variables to be inputs to next steps.209    """210    out = copy.deepcopy(v)211    outb = out.pop("outputBinding", {})212    if "secondaryFiles" in outb:213        out["secondaryFiles"] = outb["secondaryFiles"]214    return out215def _get_string_vid(vid):216    if isinstance(vid, basestring):217        return vid218    assert isinstance(vid, (list, tuple)), vid219    return "__".join(vid)220def _get_variable(vid, variables):221    """Retrieve an input variable from our existing pool of options.222    """223    if isinstance(vid, basestring):224        vid = get_base_id(vid)225    else:226        vid = _get_string_vid(vid)227    for v in variables:228        if vid == get_base_id(v["id"]):229            return copy.deepcopy(v)230    raise ValueError("Did not find variable %s in \n%s" % (vid, pprint.pformat(variables)))231def _handle_special_inputs(inputs, variables):232    """Adjust input variables based on special cases.233    This case handles inputs where we are optional or can have flexible choices.234    XXX Need to better expose this at a top level definition.235    """236    optional = [["config", "algorithm", "coverage"],237                ["config", "algorithm", "variant_regions"],238                ["config", "algorithm", "sv_regions"],239                ["config", "algorithm", "validate"],240                ["config", "algorithm", "validate_regions"]]241    all_vs = set([get_base_id(v["id"]) for v in variables])242    out = []243    for input in inputs:244        if input == ["reference", "aligner", "indexes"]:245            found_indexes = False246            for v in variables:247                vid = get_base_id(v["id"]).split("__")248                if vid[0] == "reference" and vid[1] in alignment.TOOLS:249                    out.append(vid)250                    found_indexes = True251            assert found_indexes, "Found no aligner indexes in %s" % [v["id"] for v in variables]252        elif input == ["reference", "snpeff", "genome_build"]:253            found_indexes = False254            for v in variables:255                vid = get_base_id(v["id"]).split("__")256                if vid[0] == "reference" and vid[1] == "snpeff":257                    out.append(vid)258                    found_indexes = True259            assert found_indexes, "Found no snpEff indexes in %s" % [v["id"] for v in variables]260        elif input in optional:261            if _get_string_vid(input) in all_vs:262                out.append(input)263        else:264            out.append(input)265    return out266def _get_upload_output(vid, variables):267    if isinstance(vid, dict) and "id" in vid:268        parent_v = _get_variable(vid["id"], variables)269        v = copy.deepcopy(vid)270        v["id"] = _get_string_vid(vid["id"])271        v["outputSource"] = parent_v["id"]272    else:273        v = _nest_variable(_get_variable(vid, variables))274        v["outputSource"] = v["id"]275        v["id"] = get_base_id(v["id"])276    v.pop("secondaryFiles", None)277    v["type"].pop("secondaryFiles", None)278    return v279def _create_record(name, field_defs, step_name, inputs, unlist, file_vs, std_vs, parallel):280    """Create an output record by rearranging inputs.281    Batching processes create records that reformat the inputs for282    parallelization.283    """284    if field_defs:285        fields = []286        inherit = []287        inherit_all = False288        for fdef in field_defs:289            if not fdef.get("type"):290                if fdef["id"] == "inherit":291                    inherit_all = True292                else:293                    inherit.append(fdef["id"])294            else:295                cur = {"name": _get_string_vid(fdef["id"]),296                       "type": fdef["type"]}297                fields.append(_add_secondary_to_rec_field(fdef, cur))298        if inherit_all:299            fields.extend(_infer_record_outputs(inputs, unlist, file_vs, std_vs, parallel))300        elif inherit:301            fields.extend(_infer_record_outputs(inputs, unlist + inherit, file_vs, std_vs, parallel, inherit))302    else:303        fields = _infer_record_outputs(inputs, unlist, file_vs, std_vs, parallel)304    out = {"id": "%s/%s" % (step_name, name),305           "type": {"name": name,306                    "type": "record",307                    "fields": fields}}308    if parallel in ["batch-single", "multi-batch"]:309        out = _nest_variable(out)310    return out311def _add_secondary_to_rec_field(orig, cur):312    # CWL does not currently support secondaryFiles in fields313    if orig.get("secondaryFiles"):314        cur["secondaryFiles"] = orig.get("secondaryFiles")315    return cur316def _infer_record_outputs(inputs, unlist, file_vs, std_vs, parallel, to_include=None):317    """Infer the outputs of a record from the original inputs318    """319    fields = []320    unlist = set([_get_string_vid(x) for x in unlist])321    input_vids = set([_get_string_vid(v) for v in _handle_special_inputs(inputs, file_vs)])322    to_include = set([_get_string_vid(x) for x in to_include]) if to_include else None323    added = set([])324    for raw_v in std_vs + [v for v in file_vs if get_base_id(v["id"]) in input_vids]:325        # unpack record inside this record and un-nested inputs to avoid double nested326        cur_record = is_cwl_record(raw_v)327        if cur_record:328            #unlist = unlist | set([field["name"] for field in cur_record["fields"]])329            nested_vs = [{"id": field["name"], "type": field["type"]} for field in cur_record["fields"]]330        else:331            nested_vs = [raw_v]332        for orig_v in nested_vs:333            if orig_v["id"] not in added and (not to_include or get_base_id(orig_v["id"]) in to_include):334                cur_v = {}335                cur_v["name"] = get_base_id(orig_v["id"])336                cur_v["type"] = orig_v["type"]337                if cur_v["name"] in unlist:338                    cur_v = _flatten_nested_input(cur_v)339                if to_include:340                    cur_v = _nest_variable(cur_v)341                fields.append(_add_secondary_to_rec_field(orig_v, cur_v))342                added.add(orig_v["id"])343    return fields344def _create_variable(orig_v, step, variables):345    """Create a new output variable, potentially over-writing existing or creating new.346    """347    # get current variable, and convert to be the output of our process step348    try:349        v = _get_variable(orig_v["id"], variables)350    except ValueError:351        v = copy.deepcopy(orig_v)352        if not isinstance(v["id"], basestring):353            v["id"] = _get_string_vid(v["id"])354    for key, val in orig_v.items():355        if key not in ["id", "type"]:356            v[key] = val357    if orig_v.get("type") != "null":358        v["type"] = orig_v["type"]359    v["id"] = "%s/%s" % (step.name, get_base_id(v["id"]))360    return v361def _merge_variables(new, cur):362    """Add any new variables to the world representation in cur.363    Replaces any variables adjusted by previous steps.364    """365    new_added = set([])366    out = []367    for cur_var in cur:368        updated = False369        for new_var in new:370            if get_base_id(new_var["id"]) == get_base_id(cur_var["id"]):371                out.append(new_var)372                new_added.add(new_var["id"])373                updated = True374                break375        if not updated:...agevalues.py
Source:agevalues.py  
...6def _roll(a):  # rolls a single die of "a" sides7    return random.randrange(1, a + 1)8910def _merge_variables(race, ch_class):               # returns three-item lists [base_age, #ofrolls, #ofsides]11    temp, final, number_of_classes = [], [0, 0, 0], len(ch_class)12    for a in range(number_of_classes):13        temp.append(datalocus.age_variables(race, ch_class[a]))14    for a in range(number_of_classes):15        if final[0] < temp[a][0]:16            final[0] = temp[a][0]17    for a in range(number_of_classes):18        if final[1] * final[2] < temp[a][1] * temp[a][2]:19            final[1], final[2] = temp[a][1], temp[a][2]20    return final212223def _natural_death(race):                                                                   # accepts ('Human')24    temp = random.choice([0, 0, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 4, 4])      # returns (96)25    var_value, age_thresholds = [8, 4, 6, 10, 20], datalocus.age_thresholds(race)26    # this calculates the age-span for the relevant category to determine which age_modifier to apply27    term = age_thresholds[4+round(temp/3)] - age_thresholds[3+round(temp/3)]28    age_modifier = 1 * int(term < 100) + 10 * int(100 <= term <= 250) + 20 * int(term > 250)29    # this formula computes the base age corresponding with the temp value30    temp_base = int(age_thresholds[round((temp+6.5)/2)]) + int(temp == 0 or temp == 2)31    # this formula computes the die roll, span modifier and operator corresponding with the temp value32    temp_roll = (_roll(var_value[temp]) * age_modifier + _roll(age_modifier) - 1) * (temp % 2 * -2 + 1)33    return temp_base + temp_roll343536# for i in range(100):37#     temp_death = _natural_death("Human")38#     print(temp_death)394041def generate_age(race, ch_class, level):42    temp, final, attr_names = _merge_variables(race, ch_class), [], ['Str', 'Int', 'Wis', 'Dex', 'Con', 'Cha', 'Com']43    age = temp[0]44    for a in range(temp[1]):45        age += _roll(temp[2])46    final.append(age - 1 + level)47    final.append(datalocus.age_cat(race, age)[0])48    final.append(datalocus.age_cat(race, age)[1])49    final.append(dict(zip(attr_names, datalocus.age_adj(final[1]))))50    final[3]['Exc'] = 051    final.append(_natural_death(race))52    return final535455# start = time.time()56# testrace = "Korobokuru"
...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
