Best Python code snippet using green
test_metadata_updater_integration.py
Source:test_metadata_updater_integration.py  
1#!/usr/bin/python32import unittest3import koordinates4import os5import sys6import types7import requests8import re9import json10sys.path.append('../')  11from metadata_updater import metadata_updater12class TestMetadataUpdaterUpdFile(unittest.TestCase):13    def __init__(self, *args, **kwargs):14        super(TestMetadataUpdaterUpdFile, self).__init__(*args, **kwargs)15    def setUp(self):16        """17        Get reference to config object18        """19        conf_file = os.path.join(os.getcwd(), 'data/config.yaml')20        self.config = metadata_updater.ConfigReader(conf_file)21        self.lds_test_layer = '95322'22    def get_client(self):23        """24        Non test.25        Returns koordinates api client instance26        """27        return metadata_updater.get_client(self.config.domain, self.config.api_key)28    def test_get_client(self):29        """30        test koordinates client instance31        1. Get client32        """33        client = self.get_client()34        self.assertIsInstance(client, koordinates.client.Client)35    def test_get_layer(self):36        """37        Test getting of layer instance38        1. Get client39        2. Get layer        40        """41        client = self.get_client()42        layer_id = self.config.layers[0]43        layer = metadata_updater.get_layer(client, layer_id)44        self.assertIsInstance(layer, koordinates.layers.Layer)45    def test_get_metadata(self):46        """47        Test getting of meta data48        1. Get client49        2. Get layer50        3. Get Matadata51        4. Remove file (Clean up)52        """53        client = self.get_client()54        layer_id = self.config.layers[0]55        layer = metadata_updater.get_layer(client, layer_id)56        distination_dir = os.path.join(os.getcwd(), self.config.destination_dir)57        test_overwrite = self.config.test_overwrite58        metadata_file = metadata_updater.get_metadata(layer, distination_dir, test_overwrite)59        self.assertTrue(os.path.isfile(metadata_file))60        os.remove(metadata_file) 61    def test_update_metadata(self):62        """63        Not currently testing lds metadata updating64        could ensure a test data set is always left on the lds65        for this purpose. 66         67        edit. check it has been edited68        and then revert chnages for next test69        """70        pass71 72    def test_get_draft(self):73        """74        Test getting of metadata75        1. Get client76        2. Get layer77        3. Get Draft78        """79 80        client = self.get_client()81        layer_id = self.config.layers[0]82        layer = metadata_updater.get_layer(client, layer_id)83        draft = metadata_updater.get_draft(layer)84        self.assertTrue(draft.is_draft_version)85    def test_iterate_all(self):86        """87        Test getting of "all" layers from catalog88        1. Get client89        2. Get layer90        3. Get 'All' layers91        """92 93        client = self.get_client()94        all = metadata_updater.iterate_all(client)95        self.assertIsInstance(all, types.GeneratorType)96        # The below results in v. slow tests97        self.assertTrue(len(list(all))>0)98    def test_add_to_pub_group(self):99        """100        Test getting of metadata101        1. Get client102        2. Get pub instance103        3. Get layer104        4. Get Draft105        5. Add to pub group106        """107        client = self.get_client()108        publisher = koordinates.Publish()109        layer = metadata_updater.get_layer(client, self.lds_test_layer)110        draft = metadata_updater.get_draft(layer)111        metadata_updater.add_to_pub_group(publisher, draft)112        regex = re.compile('https:\/\/data.linz.*{0}\/versions\/[0-9]*\/'.format(self.lds_test_layer))113        self.assertRegex(publisher.items[0], regex)114    def test_post_metadata(self):115        """116        Test getting of meta data117        1. Get client118        2. Get layer119        3. Get Matadata120        4. post unedited file back121        4. Remove file (Clean up)122        """123        client = self.get_client()124        layer = metadata_updater.get_layer(client, self.lds_test_layer)125        draft = metadata_updater.get_draft(layer)126        distination_dir = os.path.join(os.getcwd(), self.config.destination_dir)127        test_overwrite = self.config.test_overwrite128        metadata_file = metadata_updater.get_metadata(layer, distination_dir, test_overwrite)129        result = metadata_updater.post_metadata(draft, metadata_file)130        self.assertTrue(result)131        os.remove(metadata_file)132    def test_set_metadata(self):133        """134        1. Get Client135        2. Get Publisher136        3. Get Layer137        4. Get Draft138        5. Set metadata139        6. Check draft is in pub group140        """141        client = self.get_client()142        publisher = koordinates.Publish()143        layer = metadata_updater.get_layer(client, self.lds_test_layer)144        distination_dir = os.path.join(os.getcwd(), self.config.destination_dir)145        test_overwrite = self.config.test_overwrite146        metadata_file = metadata_updater.get_metadata(layer, distination_dir, test_overwrite)147        metadata_updater.set_metadata(layer, metadata_file, publisher)148        regex = re.compile('https:\/\/data.linz.*{0}\/versions\/[0-9]*\/'.format(self.lds_test_layer))149        self.assertRegex(publisher.items[0], regex)150class TestMetadataUpdaterUpdFileActivePub(unittest.TestCase):151    """152    Purpose: test get_draft failure153    must hit154    155    ERRORS += 1156    logger.critical('A draft already exists for {0} and is in a ' \157                    'publish group. THIS HAS NOT BEEN UPDATED '.format(layer.id))158    """159    def setUp(self):160        """161        Get reference to config instance162        """163        conf_file = os.path.join(os.getcwd(), 'data/config.yaml')164        self.config = metadata_updater.ConfigReader(conf_file)165        self.lds_test_layer = '95322'166        self.pub_id = None167    def tearDown(self):168        """169        Clean up170        """171        #THIS WILL CREATE MANY CANCELLED PUB GROUPS172        url = 'https://data.linz.govt.nz/services/api/v1/publish/{0}/'.format(self.pub_id)173        header =  {'Content-type': 'application/json', 'Authorization': 'key {0}'.format(self.config.api_key)}174        # cancel pub group175        requests.delete(url, headers=header)176    def publish(self, lds_apikey, layer, version):177        """178        Publishing must not go through. Hence the manual publish flag179        Unsure how to set this flag via python client lib hence180        going straight to the API 181        """182        url = 'https://data.linz.govt.nz/services/api/v1/publish/'183        header =  {'Content-type': 'application/json', 184               'Authorization': 'key {0}'.format(lds_apikey)}185        payload={186            "items": [187                "https://data.linz.govt.nz/services/api/v1/layers/{0}/versions/{1}/".format(layer.id,version)188            ],189            "publish_strategy": "manual",190            "error_strategy": "abort",191            "reference": "metadata_update_test"192        }193    194        response = requests.post(url, headers=header, data=json.dumps(payload))195        return response.json()196    def test_active_pub_group_assigned(self):197        """198        This is to test the fix to #23199        1. Get a draft200        2. add to pub group but with manual pub status201        3. try get another draft > Must fail as it is already associated with202        a publish group 203        """204        client = metadata_updater.get_client(self.config.domain, self.config.api_key)205        layer = metadata_updater.get_layer(client, self.lds_test_layer)206        draft = metadata_updater.get_draft(layer)207        #Check response - should be part of active pub group208        result = self.publish(self.config.api_key, layer, draft.version)209        self.pub_id = result['id']210        self.assertTrue(result['state'], 'waiting-for-approval')211        # Should fail as draft now as active pub group. 212        # These are to get logged out213        result = metadata_updater.get_draft(layer)214        self.assertIsNone(result)215if __name__ == '__main__':...test_lib.py
Source:test_lib.py  
...41    def test_write(self):42        write_key("test_write.txt", "test_write")43        data = read_key_or_default("test_write.txt", "Failed")44        self.assertEqual(data, b"test_write")45    def test_overwrite(self):46        write_key("test_overwrite.txt", "TextBefore")47        write_key("test_overwrite.txt", "test_overwrite")48        data = read_key_or_default("test_overwrite.txt", "Failed")49        self.assertEqual(data, b"test_overwrite")50@mock_s351class test_read_key_or_default(unittest.TestCase):52    def test_real(self):53        write_key("test_real.txt", "test_real")54        data = read_key_or_default("test_real.txt", "Failed")55        self.assertEqual(data, b"test_real")56    def test_default(self):57        data = read_key_or_default("test_default.txt", "test_default")58        self.assertEqual(data, b"test_default")59class test_read_file(unittest.TestCase):...test.py
Source:test.py  
...78    def test_absolute(self):9        self.assertEqual(self.var, -10)1011    def test_overwrite(self):12        self.var = -1013        self.assertTrue(self.var < 0)141516def suite():17    Suite = unittest.TestSuite()18    unittest.main(TestRepetitiveCode('test_absolute'))19    unittest.main(TestRepetitiveCode('test_overwrite'))20    return Suite212223if __name__ == "__main__":24    runner = unittest.TextTestRunner()25    runner.run(suite())
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
