Best Python code snippet using lemoncheesecake
Datalake_Extraction_Metadonnes.py
Source:Datalake_Extraction_Metadonnes.py  
1#-- Votre code python pour : Module Extraction des Metadonnes 2# -*- coding: utf-8 -*-34import sys, os, fnmatch, re, random5from bs4 import BeautifulSoup6from Datalake_Parametrage import myPathRoot_DATASOURCE7from Datalake_Parametrage import myPathRoot_LANDINGZONE8from Datalake_Parametrage import myPathRoot_CURRATEDZONE910myPathCuratedZone = myPathRoot_CURRATEDZONE11myPathHtmlSOC = myPathRoot_LANDINGZONE + "/GLASSDOOR/SOC/"12myPathHtmlAVI = myPathRoot_LANDINGZONE + "/GLASSDOOR/AVI/"13myPathHtmlEMP = myPathRoot_LANDINGZONE + "/LINKEDIN/EMP/"1415#==============================================================================16#-- Parcourir et faire un traitement sur des fichiers d'un répertoire 17#==============================================================================18myListOfFileSOC = []19myListOfFileAVI = []20myListOfFileEMP = []2122#-- ramène tous les noms des fichiers du répertoire 23myListOfFileSOC = os.listdir(myPathHtmlSOC)24myListOfFileAVI = os.listdir(myPathHtmlAVI)25myListOfFileEMP = os.listdir(myPathHtmlEMP)2627###############################################################################â  28# SUBFUNCTIONS29###############################################################################â30from datetime import datetime31def Get_datetime_ingestion_AVI():32    Result = str(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))33    return(Result)3435#==============================================================================36#-- Extraction d'information à partir d'un flux ou d'un fichier HTML en python37#==============================================================================3839###############################################################################â40#GLASSDOOR (extraction INFOS SUR ENTREPRISE)41###############################################################################42def Generation_Fichiers_avec_Metadonnees_SOC():43    myFilePathName = myPathCuratedZone + "SOC.txt"44    myFilePtr = open(myFilePathName, "w", encoding = "utf-8")45    myListeDeLigneAEcrire = [] 46    #Societe glassdoor47    myListeDeLigneAEcrire.append('cle_unique;emplacement_source;datetime_ingestion;privacy_level;nom_entreprise;nro_avis;site_web;taille;date_fondation;secteur;revenu;type_entreprise'+ '\n')48    49    for myEntry in myListOfFileSOC :  50        f = open(myPathHtmlSOC+myEntry, "r", encoding="utf8")51        myHTMLContents = f.read()52        f.close()53        mySoup = BeautifulSoup(myHTMLContents, 'lxml') 54        if len(mySoup.find_all('div', attrs = {'class':"infoEntity"})) == 7 :55            cle_unique=str(random.getrandbits(128))56            emplacement_source=myEntry57            datetime_ingestion=str(Get_datetime_ingestion_AVI())58            privacy_level="0"59            nom_entreprise=(mySoup.find('h1')['data-company'])60            nro_avis=str(mySoup.find_all('a', attrs = {'data-label':"Avis"})[0].span.contents[0]).replace(";", ",").replace('\n', ' ').replace('\r', '')61            site_web=re.search('>(.+?)<',str(mySoup.find_all('div', attrs = {'class':"infoEntity"})[0].span.contents[0])).group(1)62            taille= str(mySoup.find_all('div', attrs = {'class':"infoEntity"})[2].span.contents[0]).replace(";", ",").replace('\n', ' ').replace('\r', '')63            date_fondation=str(mySoup.find_all('div', attrs = {'class':"infoEntity"})[3].span.contents[0]).replace(";", ",").replace('\n', ' ').replace('\r', '')64            secteur=str(mySoup.find_all('div', attrs = {'class':"infoEntity"})[5].span.contents[0]).replace(";", ",").replace('\n', ' ').replace('\r', '')65            revenu=str(mySoup.find_all('div', attrs = {'class':"infoEntity"})[6].span.contents[0]).replace(";", ",").replace('\n', ' ').replace('\r', '')66            type_entreprise=str(mySoup.find_all('div', attrs = {'class':"infoEntity"})[4].span.contents[0]).replace(";", ",").replace('\n', ' ').replace('\r', '')67    68            myListeDeLigneAEcrire.append(cle_unique+";"+emplacement_source+";"+datetime_ingestion+";"+privacy_level+";"+nom_entreprise+";"+nro_avis+";"+site_web+";"+taille+";"+date_fondation+";"+secteur+";"+revenu+";"+type_entreprise+ '\n')69        70    myFilePtr.writelines(myListeDeLigneAEcrire)71    #close stream    72    myFilePtr.close()73    return (True)74   75###############################################################################76#==============================================================================77#-- GLASSDOOR (AVIS)78#==============================================================================79def Get_nom_entreprise_AVI (Soup):80    myTest = Soup.find_all('div', attrs = {"class":"header cell info"})[0].span.contents[0]81    if (myTest == []) : 82        Result = "NULL"83    else:84        Result = myTest.replace(";", ",").replace('\n', ' ').replace('\r', '')85    return(Result)8687def Get_note_moy_entreprise_AVI(Soup):88    myTest = Soup.find_all('div', attrs = {'class':'v2__EIReviewsRatingsStylesV2__ratingNum v2__EIReviewsRatingsStylesV2__large'})[0].contents[0]89    if (myTest == []) : 90        Result = "NULL"91    else:92        Result = myTest.replace(";", ",").replace('\n', ' ').replace('\r', '') 93    return(Result)9495def Get_employe_actual(soup2):96    myTest2 = soup2.find_all('span', attrs = {'class':'authorJobTitle middle reviewer'})97    if (myTest2 == []) :        98        return "NULL"99    else :100        return (re.sub(r'<span (.*)">(.*)</span>(.*)', r'\2', str(myTest2[0])).replace(";", ",").replace('\n', ' ').replace('\r', ''))101           102def Get_ville_employe(soup2):103    myTest2 = soup2.find_all('span', attrs = {'class':'authorLocation'}) 104    if (myTest2 == []) :105        return "NULL"106    else :107        return (re.sub(r'<span (.*)">(.*)</span>(.*)', r'\2', str(myTest2[0])).replace(";", ",").replace('\n', ' ').replace('\r', ''))108    109def Get_commentaire(soup2):110    myTest2= soup2.find_all('p', attrs = {'class':'mainText mb-0'}) 111    if (myTest2 == []) :        112        return "NULL"113    else :114        return (myTest2[0].text.replace(";", ",").replace('\n', ' ').replace('\r', ''))115        116def Get_date(soup2):117    myTest2= soup2.find_all('time', attrs = {'class':'date subtle small'}) 118    if (myTest2 == []) :        119        return "NULL"120    else :121        return (myTest2[0].text.replace(";", ",").replace('\n', ' ').replace('\r', ''))122123def Get_review_titre(soup2):124    myTest2= soup2.find_all('a', attrs = {'class':'reviewLink'}) 125    if (myTest2 == []) :        126        return "NULL"127    else :128        return (myTest2[0].text.replace(";", ",").replace('\n', ' ').replace('\r', ''))129    130def Get_recommend(soup2):131    myTest2= soup2.find_all('div', attrs = {'class':'row reviewBodyCell recommends'})132    if (myTest2 == []) :        133        return "Ne recommande pas"134    else :135        return (myTest2[0].contents[0].text.replace(";", ",").replace('\n', ' ').replace('\r', ''))136    137def Get_avantages(soup2):138    myTest2= soup2.find_all('div', attrs = {'class':'mt-md common__EiReviewTextStyles__allowLineBreaks'})139    if (myTest2 == []) :        140        return "Ne recommande pas"141    else :142        return (myTest2[0].contents[1].text.replace(";", ",").replace('\n', ' ').replace('\r', ''))143    144def Get_inconvenients(soup2):145    myTest2 = soup2.find_all('div', attrs = {'class':'mt-md common__EiReviewTextStyles__allowLineBreaks'})146    if (myTest2 == []) :        147        return "Pas dâavantages pour les employés"148    else :149        leng = len(myTest2)150        if (leng == 2) : 151            Result = myTest2[1].contents[1].text.replace(";", ",").replace('\n', ' ').replace('\r', '')152        else:153            Result = "Pas dâavantages pour les employés"154        return (Result)155156###############################################################################â   157# Exemple : GLASSDOOR (extraction AVIS SUR ENTREPRISE)158###############################################################################159# f = open(myPathHtmlAVI+myListOfFileAVI[0], "r", encoding="utf8")160# myHTMLContents = f.read()161# f.close() 162# mySoup = BeautifulSoup(myHTMLContents, 'lxml')163# avis = mySoup.find_all('li', attrs = {'class':'empReview'})164# soup2 = BeautifulSoup(str(avis[1]), 'lxml')165# t = Get_avantages(soup2)166# print(t)167168def Generation_Fichiers_avec_Metadonnees_AVI():169    myFilePathName = myPathCuratedZone + "AVI.txt"170    myFilePtr = open(myFilePathName, "w", encoding = "utf-8")171    myListeDeLigneAEcrire = [] 172    #avis glassdoor173    myListeDeLigneAEcrire.append('cle_unique;emplacement_source;datetime_ingestion;privacy_level;entreprise;date;review_titre;status_employe;lieu;recommande;commentaire;avantage;incovenient'+"\n")174    175    for myEntry in myListOfFileAVI :  176        f = open(myPathHtmlAVI+myEntry, "r", encoding="utf8")177        myHTMLContents = f.read()178        f.close() 179        mySoup = BeautifulSoup(myHTMLContents, 'lxml')180        avis = mySoup.find_all('li', attrs = {'class':'empReview'})181        myListTab=[[]]182        if (avis == []) : 183            print("NULL")184        else:185            for x in range(0, len(avis)) :186                if x == 0: 187                    myListTab[0] = ['"0"']188                else:189                    soup2 = BeautifulSoup(str(avis[x]), 'lxml')190                    cle_unique=str(random.getrandbits(128))191                    emplacement_source=myEntry192                    datetime_ingestion=str(Get_datetime_ingestion_AVI())193                    privacy_level="0"194                    entreprise=Get_nom_entreprise_AVI(mySoup)195                    date=Get_date(soup2)196                    review_titre=Get_review_titre(soup2)[1:-1]197                    status_employe=Get_employe_actual(soup2)198                    lieu=Get_ville_employe(soup2)199                    recommande=Get_recommend(soup2)200                    commentaire=Get_commentaire(soup2)201                    avantage=Get_avantages(soup2)202                    incovenient=Get_inconvenients(soup2)203                204                    myListeDeLigneAEcrire.append(cle_unique+";"+emplacement_source+";"+datetime_ingestion+";"+privacy_level+";"+entreprise+";"+date+";"+review_titre+";"+status_employe+";"+lieu+";"+recommande+";"+commentaire+";"+avantage+";"+incovenient+'\n')205        206    myFilePtr.writelines(myListeDeLigneAEcrire)207    myFilePtr.close()208    return (True)209    210###############################################################################211#==============================================================================212#-- LINKEDIN (EMPLOI) 213#==============================================================================214def Get_libelle_emploi_EMP(Soup):215    myTest = Soup.find_all('h1', attrs = {'class':'topcard__title'})216    if (myTest == []) : 217        Result = "NULL"218    else:219        myTest = str(myTest[0].text)220        if (myTest == []) : 221            Result = "NULL"222        else:223            Result = myTest.replace(";", ",").replace('\n', ' ').replace('\r', '')224    return(Result)225226def Get_nom_entreprise_EMP(Soup):227    myTest = Soup.find_all('span', attrs = {'class':'topcard__flavor'}) 228    if (myTest == []) : 229        Result = "NULL"230    else:231        myTest = str(myTest[0].text)232        if (myTest == []) : 233            Result = "NULL"234        else :235            Result = myTest.replace(";", ",").replace('\n', ' ').replace('\r', '')236    return(Result)237238def Get_ville_emploi_EMP (Soup):239    myTest = Soup.find_all('span', attrs = {'class':'topcard__flavor topcard__flavor--bullet'}) 240    if (myTest == []) : 241        Result = "NULL"242    else:243        myTest = str(myTest[0].text)244        if (myTest == []) : 245            Result = "NULL"246        else:247            Result = myTest.replace(";", ",").replace('\n', ' ').replace('\r', '')248    return(Result)249250251def Get_date_emploi_EMP (Soup):252    myTest = Soup.find_all('span', attrs = {'class':'topcard__flavor--metadata posted-time-ago__text'})253    if (myTest == []) : 254        Result = "NULL"255    else:256        myTest = str(myTest[0].text)257        if (myTest == []) : 258            Result = "NULL"259        else:260            Result = myTest.replace(";", ",").replace('\n', ' ').replace('\r', '')261    return(Result)262263def Get_candidats_emploi_EMP (Soup):264    myTest = Soup.find_all('span', attrs = {'class':'topcard__flavor--metadata topcard__flavor--bullet num-applicants__caption'})265    if (myTest == []) : 266        Result = "0"267    else:268        myTest = str(myTest[0].text)269        if (myTest == []) : 270            Result = "0"271        else:272            Result = myTest.replace(";", ",").replace('\n', ' ').replace('\r', '')273    return(Result)274275def Get_texte_emploi_EMP (Soup):276    myTest = Soup.find_all('div', attrs = {"description__text description__text--rich"})277    if (myTest == []) : 278        Result = "NULL"279    else:280        myTest = str(myTest[0].text)281        if (myTest == []) : 282            Result = "NULL"283        else:284            Result = myTest.replace(";", ",").replace('\n', ' ').replace('\r', '')285    return(Result)286287###############################################################################â  288# LINKEDIN (INFOS EMPLOIS)289###############################################################################â290def Generation_Fichiers_avec_Metadonnees_EMP():291    myFilePathName = myPathCuratedZone + "EMP.txt"292    myFilePtr = open(myFilePathName, "w", encoding = "utf-8")293    myListeDeLigneAEcrire = [] 294    #emp linkied295    myListeDeLigneAEcrire.append('cle_unique;emplacement_source;datetime_ingestion;privacy_level;poste;entreprise;location;date_publication;nro_candidates;description_job;hierarchie;type_emploi;function;secteurs'+"\n")296    297    for myEntry in myListOfFileEMP :  298        f = open(myPathHtmlEMP+myEntry, "r", encoding="utf8")299        myHTMLContents = f.read()300        f.close() 301        mySoup = BeautifulSoup(myHTMLContents, 'lxml')302        cle_unique=str(random.getrandbits(128))303        emplacement_source=myEntry304        datetime_ingestion=str(Get_datetime_ingestion_AVI())305        privacy_level="0"306        poste = Get_libelle_emploi_EMP(mySoup)307        entreprise = Get_nom_entreprise_EMP(mySoup)308        location = Get_ville_emploi_EMP(mySoup)309        date_publication= Get_date_emploi_EMP(mySoup)310        nro_candidates=Get_candidats_emploi_EMP(mySoup)311        description_job=mySoup.find_all('div', attrs = {'class':'description__text description__text--rich'})[0].text312        if len(mySoup.find_all('ul', attrs = {'class':'job-criteria__list'})[0].contents) == 4 :313            hierarchie=str(mySoup.find_all('ul', attrs = {'class':'job-criteria__list'})[0].contents[0].span.contents[0])314            type_emploi=str(mySoup.find_all('ul', attrs = {'class':'job-criteria__list'})[0].contents[1].span.contents[0])315            function=str(mySoup.find_all('ul', attrs = {'class':'job-criteria__list'})[0].contents[2].span.contents[0])316            secteurs=str(mySoup.find_all('ul', attrs = {'class':'job-criteria__list'})[0].contents[3].span.contents[0])317        myListeDeLigneAEcrire.append(cle_unique+";"+emplacement_source+";"+datetime_ingestion+";"+privacy_level+";"+poste+";"+entreprise+";"+location+";"+date_publication+";"+nro_candidates+";"+description_job+";"+hierarchie+";"+type_emploi+";"+function+";"+secteurs + '\n')318    319    myFilePtr.writelines(myListeDeLigneAEcrire)320    myFilePtr.close()
...test_test_runner.py
Source:test_test_runner.py  
1# Licensed under the Apache License, Version 2.0 (the "License"); you may2# not use this file except in compliance with the License. You may obtain3# a copy of the License at4#5#      http://www.apache.org/licenses/LICENSE-2.06#7# Unless required by applicable law or agreed to in writing, software8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the10# License for the specific language governing permissions and limitations11# under the License.12import os13import sys14import fixtures15import mock16from oslo_config import cfg17from oslo_log import log as logging18from oslo_utils import importutils19import six20import testtools21from murano.cmd import test_runner22from murano import version23CONF = cfg.CONF24logging.register_options(CONF)25logging.setup(CONF, 'murano')26class TestCaseShell(testtools.TestCase):27    def setUp(self):28        super(TestCaseShell, self).setUp()29        self.auth_params = {'username': 'test',30                            'password': 'test',31                            'project_name': 'test',32                            'auth_url': 'http://localhost:5000'}33        self.args = ['test-runner.py']34        for k, v in self.auth_params.items():35            k = '--os-' + k.replace('_', '-')36            self.args.extend([k, v])37        self.useFixture(fixtures.MonkeyPatch('keystoneclient.v3.client.Client',38                                             mock.MagicMock))39        dirs = [os.path.dirname(__file__),40                os.path.join(os.path.dirname(__file__), os.pardir, os.pardir,41                             os.pardir, os.pardir, os.pardir, 'meta')]42        self.override_config('load_packages_from', dirs, 'engine')43    def tearDown(self):44        super(TestCaseShell, self).tearDown()45        CONF.clear()46    def override_config(self, name, override, group=None):47        CONF.set_override(name, override, group)48        CONF.set_override('use_stderr', True)49        self.addCleanup(CONF.clear_override, name, group)50    def shell(self, cmd_args=None, exitcode=0):51        stdout = six.StringIO()52        stderr = six.StringIO()53        args = self.args54        if cmd_args:55            cmd_args = cmd_args.split()56            args.extend(cmd_args)57        with mock.patch.object(sys, 'stdout', stdout):58            with mock.patch.object(sys, 'stderr', stderr):59                with mock.patch.object(sys, 'argv', args):60                    result = self.assertRaises(SystemExit, test_runner.main)61                    self.assertEqual(result.code, exitcode,62                                     'Command finished with error.')63        stdout = stdout.getvalue()64        stderr = stderr.getvalue()65        return (stdout, stderr)66    def test_help(self):67        stdout, _ = self.shell('--help')68        usage = """usage: murano-test-runner [-h] [--config-file CONFIG_FILE]69                          [--os-auth-url OS_AUTH_URL]70                          [--os-username OS_USERNAME]71                          [--os-password OS_PASSWORD]72                          [--os-project-name OS_PROJECT_NAME]73                          [-l [</path1, /path2> [</path1, /path2> ...]]] [-v]74                          [--version]75                          <PACKAGE_FQN>76                          [<testMethod1, className.testMethod2> [<testMethod1, className.testMethod2"""  # noqa77        self.assertIn(usage, stdout)78    def test_version(self):79        stdout, stderr = self.shell('--version')80        if six.PY3:81            output = stdout82        else:83            output = stderr84        self.assertIn(version.version_string, output)85    @mock.patch.object(test_runner, 'LOG')86    def test_increase_verbosity(self, mock_log):87        self.shell('io.murano.test.MyTest1 -v')88        mock_log.logger.setLevel.assert_called_with(logging.DEBUG)89    @mock.patch('keystoneclient.v3.client.Client')90    def test_os_params_replaces_config(self, mock_client):91        # Load keystone configuration parameters from config92        importutils.import_module('keystonemiddleware.auth_token')93        self.override_config('admin_user', 'new_value', 'keystone_authtoken')94        self.shell('io.murano.test.MyTest1 io.murano.test.MyTest2')95        mock_client.assert_has_calls([mock.call(**self.auth_params)])96    def test_package_all_tests(self):97        _, stderr = self.shell('io.murano.test.MyTest1 -v')98        # NOTE(efedorova): May be, there is a problem with test-runner, since99        # all logs are passed to stderr100        self.assertIn('Test io.murano.test.MyTest1.testSimple1 successful',101                      stderr)102        self.assertIn('Test io.murano.test.MyTest1.testSimple2 successful',103                      stderr)104        self.assertIn('Test io.murano.test.MyTest2.testSimple1 successful',105                      stderr)106        self.assertIn('Test io.murano.test.MyTest2.testSimple2 successful',107                      stderr)108        self.assertNotIn('thisIsNotAtestMethod', stderr)109    def test_package_by_class(self):110        _, stderr = self.shell(111            'io.murano.test.MyTest1 io.murano.test.MyTest2 -v')112        self.assertNotIn('Test io.murano.test.MyTest1.testSimple1 successful',113                         stderr)114        self.assertNotIn('Test io.murano.test.MyTest1.testSimple2 successful',115                         stderr)116        self.assertIn('Test io.murano.test.MyTest2.testSimple1 successful',117                      stderr)118        self.assertIn('Test io.murano.test.MyTest2.testSimple2 successful',119                      stderr)120    def test_package_by_test_name(self):121        _, stderr = self.shell(122            'io.murano.test.MyTest1 testSimple1 -v')123        self.assertIn('Test io.murano.test.MyTest1.testSimple1 successful',124                      stderr)125        self.assertNotIn('Test io.murano.test.MyTest1.testSimple2 successful',126                         stderr)127        self.assertIn('Test io.murano.test.MyTest2.testSimple1 successful',128                      stderr)129        self.assertNotIn('Test io.murano.test.MyTest2.testSimple2 successful',130                         stderr)131    def test_package_by_test_and_class_name(self):132        _, stderr = self.shell(133            'io.murano.test.MyTest1 io.murano.test.MyTest2.testSimple1 -v')134        self.assertNotIn('Test io.murano.test.MyTest1.testSimple1 successful',135                         stderr)136        self.assertNotIn('Test io.murano.test.MyTest1.testSimple2 successful',137                         stderr)138        self.assertIn('Test io.murano.test.MyTest2.testSimple1 successful',139                      stderr)140        self.assertNotIn('Test io.murano.test.MyTest2.testSimple2 successful',141                         stderr)142    def test_service_methods(self):143        _, stderr = self.shell(144            'io.murano.test.MyTest1 io.murano.test.MyTest1.testSimple1 -v')145        self.assertIn('Executing: io.murano.test.MyTest1.setUp', stderr)146        self.assertIn('Executing: io.murano.test.MyTest1.tearDown', stderr)147    def test_package_is_not_provided(self):148        _, stderr = self.shell(exitcode=2)149        if six.PY3:150            err = 'the following arguments are required: '151        else:152            err = 'too few arguments'153        self.assertIn('murano-test-runner: error: %s' % err, stderr)154    def test_wrong_parent(self):155        _, stderr = self.shell(156            'io.murano.test.MyTest1 io.murano.test.MyTest3 -v', exitcode=1)157        self.assertIn('Class io.murano.test.MyTest3 is not inherited from'158                      ' io.murano.test.TestFixture. Skipping it.', stderr)...test_cmd_report.py
Source:test_cmd_report.py  
...10    @lcc.test("My Test 1")11    def mytest1(self):12        lcc.log_error("failure")13    @lcc.test("My Test 2")14    def mytest2(self):15        pass16@lcc.suite()17class suite_with_debug:18    @lcc.test()19    def test(self):20        lcc.set_step("step 1")21        lcc.log_info("1_info_message")22        lcc.log_debug("1_debug_message")23        lcc.set_step("step 2")24        lcc.log_debug("2_debug_message")25    @lcc.test("My Test 2")26    def mytest2(self):27        pass28def test_report_from_dir(tmpdir, cmdout):29    run_suite_class(mysuite, tmpdir=tmpdir, backends=[JsonBackend()])30    assert main(["report", tmpdir.strpath, "--short"]) == 031    assert_run_output(cmdout, "mysuite", successful_tests=["mytest2"], failed_tests=["mytest1"])32def test_report_from_file(tmpdir, cmdout):33    backend = JsonBackend()34    run_suite_class(mysuite, tmpdir=tmpdir, backends=[backend])35    assert main(["report", osp.join(tmpdir.strpath, backend.get_report_filename()), "--short"]) == 036    assert_run_output(cmdout, "mysuite", successful_tests=["mytest2"], failed_tests=["mytest1"])37def test_report_with_filter(tmpdir, cmdout):38    run_suite_class(mysuite, tmpdir=tmpdir, backends=[JsonBackend()])39    assert main(["report", tmpdir.strpath, "--passed", "--short"]) == 040    assert_run_output(cmdout, "mysuite", successful_tests=["mytest2"])...test_rcpyclass.py
Source:test_rcpyclass.py  
1from pypy.translator.c.test.test_genc import compile2from pypy.rpython.rcpy import cpy_export, cpy_import, CPyTypeInterface3from pypy.rpython.rcpy import cpy_typeobject4from pypy.rpython.lltypesystem import lltype5class W_MyTest(object):6    x = 6007    def __init__(self, x):8        self.x = x9    def double(self):10        return self.x * 211mytest = CPyTypeInterface('mytest', {})12def test_cpy_export():13    def f():14        w = W_MyTest(21)15        return cpy_export(mytest, w)16    fn = compile(f, [])17    res = fn(expected_extra_mallocs=1)18    assert type(res).__name__ == 'mytest'19def test_cpy_import():20    def f():21        w = W_MyTest(21)22        return cpy_export(mytest, w)23    def g():24        obj = f()25        w = cpy_import(W_MyTest, obj)26        return w.double()27    fn = compile(g, [])28    res = fn()29    assert res == 4230def test_tp_dealloc():31    class A(object):32        pass33    def f():34        w = W_MyTest(21)35        w.a = A()36        w.a.x = 437        return cpy_export(mytest, w)38    def g():39        obj = f()40        w = cpy_import(W_MyTest, obj)41        return w.a.x42    fn = compile(g, [])43    res = fn()44    # the A() should have been deallocated too, otherwise the number45    # of mallocs doesn't match the number of frees46    assert res == 447def test_manipulate_more():48    def f(input):49        current = total = 050        if input:51            w = cpy_import(W_MyTest, input)52            current, total = w.stuff53        w = W_MyTest(21)54        current += 155        total += current56        w.stuff = current, total57        return cpy_export(mytest, w), total58    fn = compile(f, [object])59    obj, total = fn(None, expected_extra_mallocs=2) # 1 W_MyTest (with 1 tuple)60    assert total == 161    obj, total = fn(obj, expected_extra_mallocs=4)  # 2 W_MyTests alive62    assert total == 363    obj, total = fn(obj, expected_extra_mallocs=4)  # 2 W_MyTests alive64    assert total == 665    obj, total = fn(obj, expected_extra_mallocs=4)  # etc66    assert total == 1067    obj, total = fn(obj, expected_extra_mallocs=4)68    assert total == 1569    obj, total = fn(obj, expected_extra_mallocs=4)70    assert total == 2171def test_instantiate_from_cpython():72    def f(input):73        if input:74            w = cpy_import(W_MyTest, input)75        else:76            w = W_MyTest(21)77        w.x += 178        return cpy_export(mytest, w), w.x79    fn = compile(f, [object])80    obj, x = fn(None, expected_extra_mallocs=1) # 1 W_MyTest81    assert x == 2282    obj2 = type(obj)()83    del obj84    obj, x = fn(obj2, expected_extra_mallocs=1) # 1 W_MyTest (obj2)85    assert obj is obj286    assert x == 601     # 600 is the class default of W_MyTest.x87def test_subclass_from_cpython():88    import py; py.test.skip("not implemented (see comments in rcpy.py)")89    def f(input):90        current = total = 1091        if input:92            w = cpy_import(W_MyTest, input)93            current, total = w.stuff94        w = W_MyTest(21)95        current += 196        total += current97        w.stuff = current, total98        return cpy_export(mytest, w), total99    fn = compile(f, [object])100    obj, total = fn(None, expected_extra_mallocs=2) # 1 W_MyTest (with 1 tuple)101    assert total == 21102    T = type(obj)103    class U(T):104        pass105    obj2 = U()106    obj2.bla = 123107    assert obj2.bla == 123108    del obj109    objlist = [U() for i in range(100)]110    obj, total = fn(obj2, expected_extra_mallocs=204) # 102 W_MyTests alive111    assert total == 1112    del objlist113    obj, total = fn(obj, expected_extra_mallocs=6) # 3 W_MyTests alive114    assert total == 3115def test_export_constant():116    mytest2 = CPyTypeInterface('mytest2', {'hi': lltype.pyobjectptr(123)})117    def f():118        w = W_MyTest(21)119        return cpy_export(mytest2, w)120    fn = compile(f, [])121    obj = fn(expected_extra_mallocs=1)122    assert obj.hi == 123123    assert type(obj).hi == 123124def test_export_two_constants():125    mytest2 = CPyTypeInterface('mytest2', {'hi': lltype.pyobjectptr(123),126                                           'there': lltype.pyobjectptr("foo")})127    def f():128        w = W_MyTest(21)129        return cpy_export(mytest2, w)130    fn = compile(f, [])131    obj = fn(expected_extra_mallocs=1)132    assert obj.hi == 123133    assert type(obj).hi == 123134    assert obj.there == "foo"135    assert type(obj).there == "foo"136def test_cpy_typeobject():137    def f():138        return cpy_typeobject(mytest, W_MyTest)139    fn = compile(f, [])140    typeobj = fn()141    assert isinstance(typeobj, type)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
