Best Python code snippet using lemoncheesecake
test_ncbi.py
Source:test_ncbi.py  
1#!/usr/bin/python2__author__ = "tomkinsc@broadinstitute.org"3# built-ins4import unittest5import os6import argparse7import pickle8import shutil9import tempfile10from collections import OrderedDict11import logging12# module-specific13from test import TestCaseWithTmp14import ncbi15import util.file16log = logging.getLogger(__name__)17skip_test = True18@unittest.skipIf(skip_test, "test is marked to be skipped")19class TestNcbiFetch(TestCaseWithTmp):20    def setUp(self):21        super(TestNcbiFetch, self).setUp()22        # these are Orungo accessions23        self.accessions = ["JQ610675.1", "JQ610676.1", "JQ610677.1", "JQ610678.1", "JQ610679.1", "JQ610680.1",24                           "JQ610681.1", "JQ610682.1", "JQ610683.1", "JQ610684.1"]25        self.myInputDir = util.file.get_test_input_path(self)26    def perform_download_and_check(self, parser_func, additional_args, expected_files, null_files):27        temp_dir = tempfile.gettempdir()28        args = ["viral-ngs-test@example.com", temp_dir]29        args.extend(self.accessions)30        args.extend(additional_args)31        args = parser_func(argparse.ArgumentParser()).parse_args(args)32        args.func_main(args)33        # check that each file that each expected file was downloaded34        # and that the contents match what they should be35        for fileName in expected_files:36            createdFilePath = os.path.join(temp_dir, fileName)37            log.info("createdFilePath: {}".format(createdFilePath))38            assert os.path.exists(39                createdFilePath), "File that should have been created does not exist: %s" % createdFilePath40            self.assertEqualContents(createdFilePath, os.path.join(self.myInputDir, fileName))41        for fileName in null_files:42            shouldNotExistFilePath = os.path.join(temp_dir, fileName)43            assert not os.path.exists(44                shouldNotExistFilePath), "File exists but it should not: %s" % shouldNotExistFilePath45class TestFastaFetch(TestNcbiFetch):46    def setUp(self):47        super(TestFastaFetch, self).setUp()48    @unittest.skipIf(skip_test, "test is marked to be skipped")49    def test_download(self):50        args = []51        expectedFiles = [a + ".fasta" for a in self.accessions]52        null_files = []53        self.perform_download_and_check(ncbi.parser_fetch_fastas,54                                        additional_args=args,55                                        expected_files=expectedFiles,56                                        null_files=null_files)57    @unittest.skipIf(skip_test, "test is marked to be skipped")58    def test_concat(self):59        args = ["--combinedFilePrefix", "orungo"]60        expectedFiles = ["orungo.fasta"]61        null_files = []62        self.perform_download_and_check(ncbi.parser_fetch_fastas,63                                        additional_args=args,64                                        expected_files=expectedFiles,65                                        null_files=null_files)66    @unittest.skipIf(skip_test, "test is marked to be skipped")67    def test_removal_of_intermediates(self):68        args = ["--combinedFilePrefix", "orungo", "--removeSeparateFiles"]69        expectedFiles = ["orungo.fasta"]70        null_files = [a + ".fasta" for a in self.accessions]71        self.perform_download_and_check(ncbi.parser_fetch_fastas,72                                        additional_args=args,73                                        expected_files=expectedFiles,74                                        null_files=null_files)75    @unittest.skipIf(skip_test, "test is marked to be skipped")76    def test_individual_preexistance(self):77        # since the arguments are positional, including an accession here makes a duplicate that should78        # raise an Error79        args = [self.accessions[0]]80        args.extend(["--combinedFilePrefix", "orungo"])81        expectedFiles = ["orungo.fasta"]82        null_files = []83        with self.assertRaises(AssertionError):84            self.perform_download_and_check(ncbi.parser_fetch_fastas,85                                            additional_args=args,86                                            expected_files=expectedFiles,87                                            null_files=null_files)88    @unittest.skipIf(skip_test, "test is marked to be skipped")89    def test_combined_preexistance(self):90        args = ["--combinedFilePrefix", "orungo"]91        expectedFiles = ["orungo.fasta"]92        null_files = []93        # call once to create the combined file94        self.perform_download_and_check(ncbi.parser_fetch_fastas,95                                        additional_args=args,96                                        expected_files=expectedFiles,97                                        null_files=null_files)98        # an error should be raised the second time the call is made99        with self.assertRaises(AssertionError):100            self.perform_download_and_check(ncbi.parser_fetch_fastas,101                                            additional_args=args,102                                            expected_files=expectedFiles,103                                            null_files=null_files)104    @unittest.skipIf(skip_test, "test is marked to be skipped")105    def test_overwrite(self):106        args = ["--combinedFilePrefix", "orungo", "--forceOverwrite"]107        expectedFiles = ["orungo.fasta"]108        null_files = []109        # call once to create the combined file110        self.perform_download_and_check(ncbi.parser_fetch_fastas,111                                        additional_args=args,112                                        expected_files=expectedFiles,113                                        null_files=null_files)114        # no error should be raised the second time the call is made115        self.perform_download_and_check(ncbi.parser_fetch_fastas,116                                        additional_args=args,117                                        expected_files=expectedFiles,118                                        null_files=null_files)119    @unittest.skipIf(skip_test, "test is marked to be skipped")120    def test_different_file_extension(self):121        args = ["--fileExt", "fa", "--combinedFilePrefix", "orungo"]122        expectedFiles = [a + ".fa" for a in self.accessions]123        expectedFiles.append("orungo.fa")124        null_files = []125        self.perform_download_and_check(ncbi.parser_fetch_fastas,126                                        additional_args=args,127                                        expected_files=expectedFiles,128                                        null_files=null_files)129class TestFeatureTableFetch(TestNcbiFetch):130    def setUp(self):131        super(TestFeatureTableFetch, self).setUp()132    @unittest.skipIf(skip_test, "test is marked to be skipped")133    def test_download(self):134        args = []135        expectedFiles = [a + ".tbl" for a in self.accessions]136        null_files = []137        self.perform_download_and_check(ncbi.parser_fetch_feature_tables,138                                        additional_args=args,139                                        expected_files=expectedFiles,140                                        null_files=null_files)141    @unittest.skipIf(skip_test, "test is marked to be skipped")142    def test_concat(self):143        args = ["--combinedFilePrefix", "orungo"]144        expectedFiles = ["orungo.tbl"]145        null_files = []146        self.perform_download_and_check(ncbi.parser_fetch_feature_tables,147                                        additional_args=args,148                                        expected_files=expectedFiles,149                                        null_files=null_files)150    @unittest.skipIf(skip_test, "test is marked to be skipped")151    def test_removal_of_intermediates(self):152        args = ["--combinedFilePrefix", "orungo", "--removeSeparateFiles"]153        expectedFiles = ["orungo.tbl"]154        null_files = [a + ".tbl" for a in self.accessions]155        self.perform_download_and_check(ncbi.parser_fetch_feature_tables,156                                        additional_args=args,157                                        expected_files=expectedFiles,158                                        null_files=null_files)159    @unittest.skipIf(skip_test, "test is marked to be skipped")160    def test_individual_preexistance(self):161        # since the arguments are positional, including an accession here makes a duplicate that should162        # raise an Error163        args = [self.accessions[0]]164        args.extend(["--combinedFilePrefix", "orungo"])165        expectedFiles = ["orungo.tbl"]166        null_files = []167        with self.assertRaises(AssertionError):168            self.perform_download_and_check(ncbi.parser_fetch_feature_tables,169                                            additional_args=args,170                                            expected_files=expectedFiles,171                                            null_files=null_files)172    @unittest.skipIf(skip_test, "test is marked to be skipped")173    def test_combined_preexistance(self):174        args = ["--combinedFilePrefix", "orungo"]175        expectedFiles = ["orungo.tbl"]176        null_files = []177        # call once to create the combined file178        self.perform_download_and_check(ncbi.parser_fetch_feature_tables,179                                        additional_args=args,180                                        expected_files=expectedFiles,181                                        null_files=null_files)182        # an error should be raised the second time the call is made183        with self.assertRaises(AssertionError):184            self.perform_download_and_check(ncbi.parser_fetch_feature_tables,185                                            additional_args=args,186                                            expected_files=expectedFiles,187                                            null_files=null_files)188    @unittest.skipIf(skip_test, "test is marked to be skipped")189    def test_overwrite(self):190        args = ["--combinedFilePrefix", "orungo", "--forceOverwrite"]191        expectedFiles = ["orungo.tbl"]192        null_files = []193        # call once to create the combined file194        self.perform_download_and_check(ncbi.parser_fetch_feature_tables,195                                        additional_args=args,196                                        expected_files=expectedFiles,197                                        null_files=null_files)198        # no error should be raised the second time the call is made199        self.perform_download_and_check(ncbi.parser_fetch_feature_tables,200                                        additional_args=args,201                                        expected_files=expectedFiles,202                                        null_files=null_files)203    @unittest.skipIf(skip_test, "test is marked to be skipped")204    def test_different_file_extension(self):205        args = ["--fileExt", "table", "--combinedFilePrefix", "orungo"]206        expectedFiles = [a + ".table" for a in self.accessions]207        expectedFiles.append("orungo.table")208        null_files = []209        self.perform_download_and_check(ncbi.parser_fetch_feature_tables,210                                        additional_args=args,211                                        expected_files=expectedFiles,212                                        null_files=null_files)213class TestGenbankRecordFetch(TestNcbiFetch):214    def setUp(self):215        super(TestGenbankRecordFetch, self).setUp()216    @unittest.skipIf(skip_test, "test is marked to be skipped")217    def test_download(self):218        args = []219        expectedFiles = [a + ".gbk" for a in self.accessions]220        null_files = []221        self.perform_download_and_check(ncbi.parser_fetch_genbank_records,222                                        additional_args=args,223                                        expected_files=expectedFiles,224                                        null_files=null_files)225    @unittest.skipIf(skip_test, "test is marked to be skipped")226    def test_concat(self):227        args = ["--combinedFilePrefix", "orungo"]228        expectedFiles = ["orungo.gbk"]229        null_files = []230        self.perform_download_and_check(ncbi.parser_fetch_genbank_records,231                                        additional_args=args,232                                        expected_files=expectedFiles,233                                        null_files=null_files)234    @unittest.skipIf(skip_test, "test is marked to be skipped")235    def test_removal_of_intermediates(self):236        args = ["--combinedFilePrefix", "orungo", "--removeSeparateFiles"]237        expectedFiles = ["orungo.gbk"]238        null_files = [a + ".gbk" for a in self.accessions]239        self.perform_download_and_check(ncbi.parser_fetch_genbank_records,240                                        additional_args=args,241                                        expected_files=expectedFiles,242                                        null_files=null_files)243    @unittest.skipIf(skip_test, "test is marked to be skipped")244    def test_individual_preexistance(self):245        # since the arguments are positional, including an accession here makes a duplicate that should246        # raise an Error247        args = [self.accessions[0]]248        args.extend(["--combinedFilePrefix", "orungo"])249        expectedFiles = ["orungo.gbk"]250        null_files = []251        with self.assertRaises(AssertionError):252            self.perform_download_and_check(ncbi.parser_fetch_genbank_records,253                                            additional_args=args,254                                            expected_files=expectedFiles,255                                            null_files=null_files)256    @unittest.skipIf(skip_test, "test is marked to be skipped")257    def test_combined_preexistance(self):258        args = ["--combinedFilePrefix", "orungo"]259        expectedFiles = ["orungo.gbk"]260        null_files = []261        # call once to create the combined file262        self.perform_download_and_check(ncbi.parser_fetch_genbank_records,263                                        additional_args=args,264                                        expected_files=expectedFiles,265                                        null_files=null_files)266        # an error should be raised the second time the call is made267        with self.assertRaises(AssertionError):268            self.perform_download_and_check(ncbi.parser_fetch_genbank_records,269                                            additional_args=args,270                                            expected_files=expectedFiles,271                                            null_files=null_files)272    @unittest.skipIf(skip_test, "test is marked to be skipped")273    def test_overwrite(self):274        args = ["--combinedFilePrefix", "orungo", "--forceOverwrite"]275        expectedFiles = ["orungo.gbk"]276        null_files = []277        # call once to create the combined file278        self.perform_download_and_check(ncbi.parser_fetch_genbank_records,279                                        additional_args=args,280                                        expected_files=expectedFiles,281                                        null_files=null_files)282        # no error should be raised the second time the call is made283        self.perform_download_and_check(ncbi.parser_fetch_genbank_records,284                                        additional_args=args,285                                        expected_files=expectedFiles,286                                        null_files=null_files)287    @unittest.skipIf(skip_test, "test is marked to be skipped")288    def test_different_file_extension(self):289        args = ["--fileExt", "gb", "--combinedFilePrefix", "orungo"]290        expectedFiles = [a + ".gb" for a in self.accessions]291        expectedFiles.append("orungo.gb")292        null_files = []293        self.perform_download_and_check(ncbi.parser_fetch_genbank_records,294                                        additional_args=args,295                                        expected_files=expectedFiles,...validate_api.py
Source:validate_api.py  
1#!/usr/bin/env python32"""3   @author TELEMAC-MASCARET Consortium4   @brief Function for validation of the Python API5"""6from os import path, chdir, remove, listdir, sep7from filecmp import cmp8import shutil9from argparse import ArgumentParser10from vvytel import copy_file_to_tmp11from vvytel import get_result_file_name12from vvytel import run_telemac_api13from vvytel import run_telemac_normal14from config import add_config_argument, update_config, CFGS15MODULE_HANDLED = ['telemac2d', 'telemac3d', 'artemis', 'tomawac']16def main(modules, example, nncsize, clean):17    """18    Main function19    """20    # Running main function21    root_dir = CFGS.get_root()22    if path.exists('ValidationTelApy.log'):23        remove('ValidationTelApy.log')24    fichier = open('ValidationTelApy.log', 'a')25    fichier.write("-----Listing Validation telapy-------\n")26    seq_only = {}27    skip_test = {}28    # Specifcation for each module29    for module in MODULE_HANDLED:30        seq_only[module] = []31        skip_test[module] = []32    # Sequential only test cases33    seq_only['telemac2d'].append('t2d_hydraulic_jump_v1p0.cas')34    seq_only['telemac2d'].append('t2d_hydraulic_jump_v2p0.cas')35    seq_only['telemac2d'].append('t2d_wesel.cas')36    seq_only['telemac2d'].append('t2d_wesel_pos.cas')37    seq_only['telemac2d'].append('t2d_delwaq.cas')38    seq_only['telemac2d'].append('t2d_ruptmoui.cas')39    seq_only['telemac2d'].append('t2d_triangular_shelf.cas')40    seq_only['telemac2d'].append('t2d_island.cas')41    seq_only['telemac2d'].append('t2d_tide-jmj_real_gen.cas')42    seq_only['telemac2d'].append('t2d_tide-jmj_type_gen.cas')43    seq_only['telemac2d'].append('t2d_dambreak_v1p0.cas')44    seq_only['telemac3d'].append('t3d_delwaq.cas')45    seq_only['telemac3d'].append('t3d_pluie.cas')46    seq_only['telemac3d'].append('t3d_tide-jmj_real_gen.cas')47    seq_only['artemis'].append('none')48    seq_only['tomawac'].append('tom_turning_wind.cas')49    seq_only['tomawac'].append('tom_manche.cas')50    seq_only['tomawac'].append('tom_manchelim.cas')51    # Test case that can not work with api52    # Using homere_adj not handle by api53    skip_test['telemac2d'].append('estimation')54    # Reruning telemac from homere not handled by api55    skip_test['telemac2d'].append('convergence')56    # Case that are not run by validation57    skip_test['telemac2d'].append('t2d_tide-jmj_type_med.cas')58    skip_test['telemac2d'].append('t2d_tide-ES_real.cas')59    # Non telemac3d case in folder60    skip_test['telemac3d'].append('t2d_canal.cas')61    skip_test['telemac3d'].append('p3d_amr.cas')62    skip_test['telemac3d'].append('p3d_bump.cas')63    skip_test['telemac3d'].append('p3d_canal.cas')64    skip_test['telemac3d'].append('p3d_cooper.cas')65    skip_test['telemac3d'].append('p3d_depot.cas')66    skip_test['telemac3d'].append('p3d_flume_slope.cas')67    skip_test['telemac3d'].append('p3d_gouttedo.cas')68    skip_test['telemac3d'].append('p3d_lock-hydro.cas')69    skip_test['telemac3d'].append('p3d_lock-nonhydro.cas')70    skip_test['telemac3d'].append('p3d_nonlinearwave.cas')71    skip_test['telemac3d'].append('p3d_piledepon.cas')72    skip_test['telemac3d'].append('p3d_piledepon-nonhydro.cas')73    skip_test['telemac3d'].append('p3d_pluie.cas')74    skip_test['telemac3d'].append('p3d_rouse.cas')75    skip_test['telemac3d'].append('p3d_stratification.cas')76    skip_test['telemac3d'].append('p3d_tetra.cas')77    skip_test['telemac3d'].append('p3d_vent.cas')78    skip_test['telemac3d'].append('p3d_V.cas')79    # Coupling test case80    skip_test['telemac3d'].append('depot')81    skip_test['telemac3d'].append('heat_exchange')82    # Artemis animated test case83    skip_test['artemis'].append('art_bj78_animated.cas')84    skip_test['artemis'].append('art_creocean_animated.cas')85    skip_test['artemis'].append('art_creocean_2.cas')86    skip_test['artemis'].append('art_creocean.cas')87    # Tomawac coupled test cases88    skip_test['tomawac'].append('3Dcoupling')89    for module in modules:90        fichier.write("-- For module " + module + "\n")91        module_dir = path.join(root_dir, 'examples', module)92        list_test_case = []93        if example != '':94            list_test_case.append(example)95        else:96            list_test_case = sorted(listdir(module_dir))97        # Sequential only test_case98        for i, test_case in enumerate(list_test_case):99            if test_case in skip_test[module]:100                continue101            case_dir = path.join(module_dir, test_case)102            tmp_dir = path.join(case_dir, 'tmp')103            print("<"+str(i+1)+"/"+str(len(list_test_case))+'> '+str(test_case))104            fichier.write('Running test case '+test_case+'\n')105            list_file = copy_file_to_tmp.copy_file_to_tmp(\106                    case_dir, tmp_dir, \107                    module, root_dir, skip_test[module])108            chdir(tmp_dir)109            for cas, fortran in list_file:110                #111                # Running Telemac based on telapy112                #113                if cas in skip_test[module]:114                    continue115                # Get results names116                res_file = get_result_file_name.get_result_file_name(module,117                                                                     cas)118                api_res_file = res_file+'_api'119                # Running in sequential mode120                # if the case does not run in parallel121                if cas in seq_only[module]:122                    ncsize = 1123                else:124                    ncsize = nncsize125                passed_api = run_telemac_api.run_telemac_api(module, cas,126                                                             ncsize, fortran)127                if passed_api:128                    shutil.move(res_file, api_res_file)129                # Running Telemac classical way130                #131                passed_normal = run_telemac_normal.run_telemac_normal(module,132                                                                      cas,133                                                                      ncsize)134                #135                # Result comparison between api and136                #  classical Telemac computation137                #138                if not passed_normal:139                    fichier.write('   Normal run crashed\n')140                if not passed_api:141                    fichier.write('   Api run crashed\n')142                if not passed_api or not passed_normal:143                    fichier.write(str(cas)+'                       FAILED'+'\n')144                    continue145                if not path.exists(res_file):146                    fichier.write('   Missing '+res_file+"\n")147                    fichier.write(str(cas)+'                       FAILED'+'\n')148                    continue149                if not path.exists(api_res_file):150                    fichier.write('   Missing '+api_res_file+"\n")151                    fichier.write(str(cas)+'                       FAILED'+'\n')152                    continue153                compare = cmp(res_file, api_res_file)154                if compare:155                    fichier.write(str(cas)+'                       PASSED'+'\n')156                else:157                    fichier.write(str(cas)+'                       FAILED'+'\n')158            if clean:159                chdir(module_dir+sep+test_case)160                shutil.rmtree(module_dir+sep+test_case+sep+'tmp')161        fichier.write('my work is done '+'\n')162if __name__ == "__main__":163# <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<164# ~~ Reads config file ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~165    print('\n\nLoading Options and Configurations\n'+72*'~'+'\n')166    PARSER = ArgumentParser(\167        description='Make the validation of Telemac-Mascaret API '\168            'and/or executable using the API')169    PARSER = add_config_argument(PARSER)170    PARSER.add_argument(\171        "-m", "--module",172        dest='modules',173        default="telemac2d",174        help="specify the list of folder to validate seprated by ,")175    PARSER.add_argument(\176        "--clean",177        action="store_true",178        dest="clean",179        default=False,180        help="Remove tmp folders")181    PARSER.add_argument(\182        "-n", "--cnsize",183        dest='ncsize',184        default=4,185        help="specify the number of processor the test case will be run with")186    PARSER.add_argument(\187        "-e", "--example",188        dest='example',189        default="",190        help="specify the name of the test case to compute")191    ARGS = PARSER.parse_args()192    update_config(ARGS)193    main(ARGS.modules.split(','), \...ensembler.py
Source:ensembler.py  
1import sys, gzip, json, os, math, pickle, re, copy2import numpy as np3import multiprocessing as mp4pool_size = int(mp.cpu_count())5from datetime import datetime6from math import exp, log, sqrt7root_path = '/home/marsan/workspace/tnative'8sys.path.append(root_path)9from lib import dgen as dgen10from lib import top as top11#==========================================12#   machein learning flow wrapper13#==========================================  14# [container of model unit]15class ml_unit(object):16  def __init__(self, alg, D, data, tmin, tmax, en_plot, en_fast_data, skip_test, debug):17    self.ml = top.ml(alg=alg, D=D, en_plot=en_plot, en_fast_data=en_fast_data, debug=debug)18    self.data = data19    self.tmin = tmin20    self.tmax = tmax21    self.vrng = abs(tmax - tmin)22    self.vmin = 1 - self.vrng if self.vrng < 0.5 else self.vrng23    self.vmax = 124    self.y2p = []25    self.pnorm = -126    self.skip_test = skip_test27    # self.q = mp.Queue()28    # update data filter29    for fea, val in self.data.items():30      self.ml.dgen.tbl = eval("self.ml.dgen.tbl.filter(%s=val)" % (fea))31    32  # for multiprocess33  def train_unit(self):34    self.ml.train(self.tmin, self.tmax, self.vmin, self.vmax, skip_test=self.skip_test)35    # q.put([self.ml.learner])36    return self.ml.learner37class ensembler(object):38  def __init__(self, segments, vrate, D=2**24, en_fast_data=None, en_plot=False, en_pnorm=False, skip_test=False, debug=False):39    # samples40    self.D = D41    self.vmin = 1 - vrate42    self.vmax = 143    44    # ctrl45    self.en_plot = en_plot46    self.en_pnorm = en_pnorm47    self.skip_test = skip_test48    self.en_fast_data = en_fast_data49    self.debug = debug50    51    # initialize models 52    self.dgen = dgen.data_gen(D=self.D, en_fast_data=self.en_fast_data, debug=self.debug) # samples for final test53    self.ml_group = []54    for s in segments:55      item = ml_unit(alg=s['alg'], D=self.D, data=s['data'], tmin=s['tmin'], tmax=s['tmax'], en_plot=self.en_plot, en_fast_data=self.en_fast_data, skip_test=self.skip_test, debug=self.debug)56      self.ml_group.append(item)57      58  #-------------------------59  #   convension60  #-------------------------61  def finitize(self, n, e=35):62    return max(min(n, e), -e)   # make -e <= n <= e63  def merge_sigmoid(self, nlist, e=35):64    nlist = [-log(max((1/max(n, 1e-35) - 1), 1e-35)) for n in nlist]65    nmean = np.mean(nlist)66    nsig = 1. / (1. + exp(-self.finitize(nmean, e)))67    return nsig68  #-------------------------69  #   train & test70  #-------------------------71  def train_and_test(self, en_multi_threads=False):72    self.train_all(en_multi_threads)73    roc_auc, yr_ens, yp_ens = self.test()74    self.save(roc_auc)75    return roc_auc, yr_ens, yp_ens76  def train_all(self, en_multi_threads=True):77    processes = []78    mp_pool = mp.Pool(pool_size)79    for l in self.ml_group:80      if not en_multi_threads:81        l.ml.train(l.tmin, l.tmax, l.vmin, l.vmax, skip_test=self.skip_test)  # [single process for debug]82      else:83        p = mp_pool.apply_async(l.train_unit, ())84        processes.append((l, p))85        # p = mp.Process(target=l.train_unit, args=(l.q,))86        # processes.append(p)87        # p.start()88    if en_multi_threads:89      for l, p in processes:90        l.ml.learner = p.get()91      # for l in self.ml_group:92      #   l.ml.learner = l.q.get()[0]93      # for p in processes: p.join()94    print("[Ensembler] models training done @ %s" % datetime.now())95    return self.ml_group96  def test(self):97    print("\n%s\n#  [Ensembler] start grader %.2f - %.2f @ %s\n%s" % ("-"*60, 100*self.vmin, 100*self.vmax, datetime.now(), "-"*60))98    yr_ens = {}99    yp_ens = {}100    for s in self.ml_group:101      sdgen = copy.copy(self.dgen)102      for fea, val in s.data.items():103        sdgen.tbl = eval("sdgen.tbl.filter(%s=val)" % (fea))104      ids = [str(r.id) for r in sdgen.raw_range(self.vmin, self.vmax).only('id')]105      raw = sdgen.gen_data(self.vmin, self.vmax)106      # get y2p107      s.y2p = s.ml.learner.train(raw, training=False, info={'all_cnt': -1})108      if self.en_pnorm:109        s.pnorm = s.ml.grader.find_pnorm(s.y2p)110        s.y2p = [[y, min(1, p/s.pnorm)] for y, p in s.y2p]111      # map to ensembled y2p112      for i in range(len(ids)):113        key = ids[i]114        if key not in yp_ens: 115          yr_ens[key] = []116          yp_ens[key] = []117        yr_ens[key].append(s.y2p[i][0])118        yp_ens[key].append(s.y2p[i][1])119    y2p_ens = [(np.mean(yrs), self.merge_sigmoid(yp_ens[rid])) for rid, yrs in yr_ens.items()]120    grader = self.ml_group[0].ml.grader121    roc_auc = grader.auc_curve(y2p_ens)122    scan = grader.scan_all_threshold(y2p_ens)123    print("[Ensembler] ensembled ROC: %.3f%% @ %s" % (roc_auc*100, datetime.now()))124    return roc_auc, yr_ens, yp_ens125  #-------------------------126  #   layer-2127  #-------------------------128  def train_layer2(self):129    # collect samples130    Xt = []131    Yt = []132    sdgen = copy.copy(self.dgen)133    pass134  def test_layer2(self):135    pass136  #-------------------------137  #   model reuse138  #-------------------------139  def save(self, auc):140    filepath = "%s/models/m%i_v%i_auc_%i_%s" % (root_path, len(self.ml_group), (self.vmax-self.vmin)*100, auc*1000, datetime.now().strftime("%Y%m%d_%H%M"))141    trained_models = [mlu.ml.learner for mlu in self.ml_group]142    pickle.dump(trained_models, open(filepath, 'wb'))143    print("ensemble model saved in %s @ %s" % (filepath, datetime.now()))144    return filepath145#==========================================146#   experiments147#==========================================148def k_fold_ensemble(alg, k, vrate=0.1, en_plot=False, en_fast_data=None, skip_compare=True):149  if not skip_compare:150    print("="*5, '[train by single thread]', '='*40)151    top.ml(alg=alg, en_plot=en_plot).train(0, 1-vrate)152  print("[%i_fold_%s_ensemble] start @ %s" % (k, alg, datetime.now()))153  segments = []154  step = (1.0 - vrate)/k155  for i in range(k):156    segments.append({157      'alg': alg,158      'tmin': step*(i+1),159      'tmax': step*i,160      'data': {161        'isad__ne': None,162        # 'label__ne': None,163      }164    })165  ens = ensembler(segments, vrate=vrate, en_plot=en_plot, en_fast_data=en_fast_data, skip_test=True)  # k-fold MUST skip_test=True!! since we block ens.vmin166  for item in ens.ml_group: # prevent models from getting test samples167    item.ml.dgen.tbl = item.ml.dgen.tbl.filter(rand__lt=ens.vmin) 168  return ens.train_and_test()169def xgboost_sklr(vrate=0.1, en_plot=False):170  segments = [171    {172      'alg': alg, 173      'tmin': 0, 174      'tmax': 1-vrate,175      'data': {176        'isad__ne': None,177        # 'label__ne': None,178      },179    } for alg in ['sklr', 'xgboost']180  ]181  return ensembler(segments, vrate=vrate, en_plot=en_plot).train_and_test()182#==========================================183#   dnq ensemble (divide-and-conquer)184#==========================================185dnq_segments = {186  'status'  : [('status', 'bad'), ('status', 'normal')],187  'lang'    : [('meta_lang__icontains', 'en'), ('meta_lang', '')],188  'domain'  : [('domain__icontains', '.net'), ('domain__icontains', '.com'), ('domain__icontains', '.org'), ('domain__icontains', '.uk')],189}190def dnq_ensemble(alg, segname, vrate=0.1, en_plot=False, skip_compare=True):191  srate = 1-vrate if vrate > 0.5 else vrate192  if not skip_compare:193    print("="*5, '[train by single thread]', '='*40)194    auc_single = top.ml(alg='sklr', en_plot=en_plot).train(0, srate)195  print("="*5, '[train by ensemble]', '='*40)196  if segname == 'all':197    segs = ([('fid__ne', -1)] + [j for k in dnq_segments.values() for j in k])198  else:199    segs = ([('fid__ne', -1)] + dnq_segments[segname])200  segments = [{201    'alg': alg, 202    'tmin': 0, 203    'tmax': srate,204    'data': {205      'isad__ne': None,206      s: v,207    },208  } for s,v in segs]209  print("[dnq_ensemble] condition: ", segments)210  ens = ensembler(segments, vrate=vrate, en_plot=en_plot)211  return ens.train_and_test()212#==========================================213#   verify214#==========================================215if __name__ == '__main__':216  cmd = str(sys.argv[1])217  vrate = float(sys.argv[2])218  if (len(sys.argv) >= 4): cmd2 = str(sys.argv[3])219  #220  if cmd == '5_fold_xgboost':221    k_fold_ensemble('xgboost', k=5, vrate=vrate, en_plot=False)222  elif cmd == '5_fold_sklr':223    k_fold_ensemble('sklr', k=5, vrate=vrate, en_plot=False, en_fast_data='D_20_tfidf_cnts')224  elif cmd == 'xgboost_sklr':225    xgboost_sklr(vrate=vrate, en_plot=False)226  elif cmd == 'dnq_ensemble':227    dnq_ensemble('sklr', cmd2, vrate=vrate, en_plot=False)    228  elif cmd == 'dnq_all':...test_carrito_compras.py
Source:test_carrito_compras.py  
1import pytest2skip_test = pytest.mark.skipif(False, reason="skip")3#34@skip_test5def test_create_carrito_quantity_is_cero():6    from module.producto import Producto7    from module.detalleproducto import DetalleProducto8    from module.carrito import Carrito9    name = "producto 1"10    stock= 511    description= "description"12    producto_valido = Producto.create_product(13        name=name, 14        stock=stock, 15        description=description)16            17    assert producto_valido["name"] == name18    assert producto_valido["stock"] == stock19    assert producto_valido["description"] == description20    cantidad = 021    with pytest.raises(ValueError, match="cantidad can not be 0"):22        detalle_producto = DetalleProducto.crear_detalle(23            cantidad=cantidad,24            producto= producto_valido25        )26#527@skip_test28def test_create_carrito():29    from module.producto import Producto30    from module.detalleproducto import DetalleProducto31    from module.carrito import Carrito32    name = "producto 1"33    stock=1034    description= "description"35    producto_valido = Producto.create_product(36        name=name, 37        stock=stock, 38        description=description)39    40    assert producto_valido["name"] == name41    assert producto_valido["stock"] == stock42    assert producto_valido["description"] == description43    producto_valido_2 = Producto.create_product(44        name=name, 45        stock=stock, 46        description=description)47    48    assert producto_valido_2["name"] == name49    assert producto_valido_2["stock"] == stock50    assert producto_valido_2["description"] == description51    cantidad = 152    detalle_producto = DetalleProducto.crear_detalle(53        cantidad=cantidad,54        producto= producto_valido55    )56    assert detalle_producto["cantidad"] == cantidad57    detalle_producto_2 = DetalleProducto.crear_detalle(58        cantidad=cantidad,59        producto= producto_valido60    )61    assert detalle_producto_2["cantidad"] == cantidad62    lista_detalles =[detalle_producto, detalle_producto_2]63    carrito = Carrito.agregar_producto(lista_productos=lista_detalles)64    assert len(carrito) > 065#666@skip_test67def test_nombre_producto_distinto_null():68    from module.producto import Producto69    with pytest.raises(ValueError, match="none is not an allowed value"):70        crear_producto = Producto.create_product(71            name=None, 72            stock=1, 73            description="descripcion")74#775@skip_test76def test_nombre_producto_numerico():77    from module.producto import Producto78    with pytest.raises(ValueError, match="name can not be an integer"):79        crear_producto = Producto.create_product(80            name=11111, 81            stock=1, 82            description="descripcion")83#984@skip_test85def test_descripcion_producto_distinto_null():86    from module.producto import Producto87    with pytest.raises(ValueError, match="none is not an allowed value"):88        crear_producto = Producto.create_product(89            name="Juan", 90            stock=1, 91            description=None)92#1293@skip_test94def test_stock_producto_no_negativo():95    from module.producto import Producto96    with pytest.raises(ValueError, match="ensure this value is greater than -1"):97        crear_producto = Producto.create_product(98            name="juan", 99            stock=-1, 100            description="descripcion")101#8102@skip_test103def test_nombre_producto_name_too_long():104    from module.producto import Producto105    from random import choice106    from string import ascii_uppercase107    name = ''.join(choice(ascii_uppercase) for i in range(11))108    with pytest.raises(ValueError, match="ensure this value has at most 10 characters"):109        crear_producto = Producto.create_product(110            name=name, 111            stock=1, 112            description="descripcion")113#11114@skip_test115def test_create_product():116    from module.producto import Producto117    name = "juan"118    stock=10119    description= "description"120    crear_producto = Producto.create_product(121        name=name, 122        stock=stock, 123        description=description)124    125    assert crear_producto["name"] == name126    assert crear_producto["stock"] == stock127    assert crear_producto["description"] == description128#13129@skip_test130def test_stock_producto_puede_ser_cero():131    from module.producto import Producto132    crear_producto = Producto.create_product(133        name="juan", 134        stock=0, 135        description="descripcion")136    137    assert crear_producto["stock"] == 0138#15139@skip_test140def test_stock_producto_distinto_null():141    from module.producto import Producto142    with pytest.raises(ValueError, match="none is not an allowed value"):143        crear_producto = Producto.create_product(144            name="juan", 145            stock=None, 146            description="descripcion")147#17148@skip_test149def test_stock_valido_name_nulo_descripcion_nula():150    from module.producto import Producto151    with pytest.raises(ValueError, match="none is not an allowed value"):152        crear_producto = Producto.create_product(153            name=None, 154            stock=0, 155            description=None)156#18157@skip_test158def test_descripcion_valido_name_nulo_stock_nulo():159    from module.producto import Producto160    with pytest.raises(ValueError, match="none is not an allowed value"):161        crear_producto = Producto.create_product(162            name=None, 163            stock=None, 164            description="descripcion")165#19166@skip_test167def test_create_detalle_producto():168    from module.producto import Producto169    from module.detalleproducto import DetalleProducto170    name = "producto 1"171    stock=10172    description= "description"173    producto_valido = Producto.create_product(174        name=name, 175        stock=stock, 176        description=description)177    178    assert producto_valido["name"] == name179    assert producto_valido["stock"] == stock180    assert producto_valido["description"] == description181    cantidad = 1182    detalle_producto = DetalleProducto.crear_detalle(183        cantidad=cantidad,184        producto= producto_valido185    )186    assert detalle_producto["cantidad"] == cantidad187    assert detalle_producto["producto"]["stock"] == stock-cantidad188#20189@skip_test190def test_create_detalle_producto_cantidad_negativa():191    from module.producto import Producto192    from module.detalleproducto import DetalleProducto193    name = "producto 1"194    stock=10195    description= "description"196    producto_valido = Producto.create_product(197        name=name, 198        stock=stock, 199        description=description)200    201    assert producto_valido["name"] == name202    assert producto_valido["stock"] == stock203    assert producto_valido["description"] == description204    cantidad = 0205    with pytest.raises(ValueError, match="cantidad can not be 0"):206        detalle_producto = DetalleProducto.crear_detalle(207            cantidad=cantidad,208            producto=producto_valido...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
