Best Python code snippet using autotest_python
datasource_tests.py
Source:datasource_tests.py  
...57        conn_ = MagicMock(return_value=con)58        ds = DataSource.factory(self.DB)59        with patch('psycopg2.connect', conn_):60            query = 'select * from tbl;'61            ds._get_iter(query=query).next()62            exec_.assert_called_with(query)63            exec_.reset_mock()64            query = 'select * from tbl'65            ds._get_iter(query=query).next()66            exec_.assert_called_with(query + ';')67            # query is required68            self.assertRaises(69                ImportHandlerException, ds._get_iter, None)70            self.assertRaises(71                ImportHandlerException, ds._get_iter, ' ')72    def test_db_datasource_invalid_definition(self):73        # Vendor is invalid74        config = objectify.fromstring(75            """<db name="odw"76                host="localhost"77                dbname="odw"78                user="postgres"79                password="postgres"80                vendor="invalid" />""")81        ds = DataSource.factory(config)82        self.assertRaises(ImportHandlerException, ds._get_iter, 'query')83        # Host isn't specified84        config = objectify.fromstring(85            """<db name="odw"86                dbname="odw"87                user="postgres"88                password="postgres"89                vendor="postgres" />""")90        ds = DataSource.factory(config)91        self.assertRaises(ImportHandlerException, ds._get_iter, 'query')92    def test_http_data_source(self):93        mock = MagicMock()94        mock.json.return_value = {"key": "val"}95        with patch('requests.request', mock):96            ds = HttpDataSource(self.HTTP)97            ds._get_iter()98            mock.assert_called_with(99                'GET', 'http://upwork.com/jar/', stream=True)100            mock.reset_mock()101            # query_target isn't supported102            self.assertRaises(103                ImportHandlerException, ds._get_iter, '', 'query_target')104        # url is required105        config = objectify.fromstring(106            """<http name="jar" method="GET" url="" />""")107        self.assertRaises(108            ImportHandlerException, HttpDataSource, config)109        config = objectify.fromstring(110            """<http name="jar" method="GET" />""")111        self.assertRaises(112            ImportHandlerException, HttpDataSource, config)113    def test_csv_datasource(self):114        ds = CsvDataSource(self.CSV)115        self.assertItemsEqual(116            ds.headers, [('id', 0), ('name', 2), ('score', 3)])117        res = ds._get_iter().next()118        self.assertEquals(119            res, {'score': 'score', 'id': 'id', 'name': 'name'})120        ds = CsvDataSource(self.CSV_WITHOUT_HEADER)121        iter_ = ds._get_iter()122        res = iter_.next()123        self.assertEquals(124            res, {'3': 'score1',125                  '0': 'id1',126                  '5': [1, 2, 3],127                  '2': 'name1',128                  '4': {u'key': u'val'},129                  '1': 'type1'})130        res = iter_.next()131        self.assertEquals(132            res, {'2': 'name2',133                  '5': '',134                  '3': 'score2',135                  '4': '{{val}}',136                  '1': 'type2',137                  '0': 'id2'})138        # src is missing139        config = objectify.fromstring(140            """<csv name="jar" method="GET" />""")141        self.assertRaises(142            ImportHandlerException, CsvDataSource, config)143        config = objectify.fromstring(144            """<csv name="csvDataSource" src="%s/stats.csv">145                <!-- Note that some columns are ignored -->146                <header name="id" index="0" />147                <header name="name" index="2" />148                <header name="score" index="10" />149            </csv>""" % BASEDIR)150        ds = CsvDataSource(config)151        iter_ = ds._get_iter()152        self.assertRaises(ImportHandlerException, iter_.next)153    def test_input_datasource(self):154        ds = InputDataSource(self.INPUT)155        ds._get_iter('{"key": "val"}')156    def test_factory(self):157        config = objectify.fromstring("""<invalid />""")158        self.assertRaises(159            ImportHandlerException, DataSource.factory, config)160        config = objectify.fromstring("""<db name="" />""")161        self.assertRaises(162            ImportHandlerException, DataSource.factory, config)163        ds = DataSource.factory(self.DB)164        self.assertEquals(type(ds), DbDataSource)165        self.assertEquals(ds.type, 'db')166        ds = DataSource.factory(self.HTTP)167        self.assertEquals(type(ds), HttpDataSource)168        self.assertEquals(ds.type, 'http')169        ds = DataSource.factory(self.CSV)170        self.assertEquals(type(ds), CsvDataSource)171        self.assertEquals(ds.type, 'csv')172        ds = DataSource.factory(self.PIG)173        self.assertEquals(type(ds), PigDataSource)174        self.assertEquals(ds.type, 'pig')175        ds = DataSource.factory(self.INPUT)176        self.assertEquals(type(ds), InputDataSource)177        self.assertEquals(ds.type, 'input')178def conn_exec_print(cursor, query):179    print "Query is", query180class DbDataSourceTests(unittest.TestCase):181    def setUp(self):182        self.datasource = DataSource.factory(DataSourcesTest.DB)183        self.assertEquals(type(self.datasource), DbDataSource)184    @patch('cloudml.importhandler.db.execute', side_effect=conn_exec_print)185    def test_sql_injection_on_query_target(self, exec_mock):186        query = 'SELECT * FROM pg_catalog.pg_tables'187        iter_ = self.datasource._get_iter(188            query, query_target='target_tbl')189        with self.assertRaises(ValueError):190            self.datasource._get_iter(191                query,192                query_target='target_tbl;delete * from tbl3;')193class PigDataSourceTests(unittest.TestCase):194    PLACEBO_RESPONSES_DIR = os.path.abspath(195        os.path.join(os.path.dirname(__file__),196                     'placebo_responses/datasource/'))197    def setUp(self):198        super(PigDataSourceTests, self).setUp()199        self.pill = StreamPill(debug=True)200        self.session = boto3.session.Session()201        boto3.DEFAULT_SESSION = self.session202    @patch('time.sleep', return_value=None)203    def test_get_iter_existing_job(self, sleep_mock):204        # Amazon mock205        self.pill.attach(self.session,206                         os.path.join(self.PLACEBO_RESPONSES_DIR,207                                      'get_iter_existing_job'))208        self.pill.playback()209        pig_import = 'cloudml.importhandler.datasources.PigDataSource'210        ds = PigDataSource(DataSourcesTest.PIG)211        # test correct case with existing job212        # (DescribeJobFlows_1: 2 steps exist before adding a new one)213        # (DescribeJobFlows_2: RUNNING, RUNNING)214        # (DescribeJobFlows_3: WAITING, COMPLETED)215        ds.jobid = "1234"216        with patch("{}.get_result".format(pig_import), MagicMock()):217            with patch("{}._process_running_state".format(pig_import)) \218                    as run_handler:219                with patch("{}._process_waiting_state".format(pig_import)) \220                        as wait_handler:221                    ds._get_iter('query here', 'query target')222                    # step_number is 3223                    run_handler.assert_called_with(ANY, 'RUNNING', 3)224                    wait_handler.assert_called_with(ANY, 'COMPLETED', 3)225    @patch('time.sleep', return_value=None)226    def test_get_iter_create_job(self, sleep_mock):227        # Amazon mock228        self.pill.attach(self.session,229                         os.path.join(self.PLACEBO_RESPONSES_DIR,230                                      'get_iter_create_job'))231        self.pill.playback()232        pig_import = 'cloudml.importhandler.datasources.PigDataSource'233        ds = PigDataSource(DataSourcesTest.PIG)234        ds.jobid = None235        with patch("{}.get_result".format(pig_import), MagicMock()):236            with patch("{}._process_completed_state".format(pig_import)) as \237                    complete_handler:238                ds._get_iter('query here', 'query target')239                self.assertEqual("234", ds.jobid)240                # step_number is 1241                complete_handler.assert_called_with(ANY, 'COMPLETED', 1)242    @patch('time.sleep', return_value=None)243    def test_get_iter_check_statuses(self, sleep_mock):244        # Amazon mock245        self.pill.attach(self.session,246                         os.path.join(self.PLACEBO_RESPONSES_DIR,247                                      'get_iter_statuses'))248        self.pill.playback()249        pig_import = 'cloudml.importhandler.datasources.PigDataSource'250        ds = PigDataSource(DataSourcesTest.PIG)251        self.assertRaises(ProcessException, ds._get_iter, 'query here')252        _store_query_to_s3 = MagicMock(return_value="s3://bucket/script.jar")253        clear_output_folder = MagicMock()254        _run_steps_on_existing_jobflow = MagicMock(return_value=1)255        get_result = MagicMock()256        _get_log = MagicMock(return_value="Some log")257        ds.jobid = "234"258        with patch("{}._store_query_to_s3".format(pig_import),259                   _store_query_to_s3):260            with patch("{}.clear_output_folder".format(pig_import),261                       clear_output_folder):262                with patch("{}.get_result".format(pig_import), get_result):263                    with patch("{}._run_steps_on_existing_jobflow".format(264                            pig_import, _run_steps_on_existing_jobflow)):265                        with patch("{}._get_log".format(pig_import), _get_log):266                            # test failed case with new job267                            # (DescribeJobFlows_1: FAILED, FAILED)268                            self.assertRaises(ImportHandlerException,269                                              ds._get_iter,270                                              "query here", "query target")271                            # test failed case with new job272                            # (DescribeJobFlows_2: COMPLETED, FAILED)273                            self.assertRaises(ImportHandlerException,274                                              ds._get_iter,275                                              "query here", "query target")276                            # test failed case with new job277                            # (DescribeJobFlows_3: WAITING, FAILED)278                            self.assertRaises(ImportHandlerException,279                                              ds._get_iter,280                                              "query here", "query target")281                            # unexpected status check282                            # (DescribeJobFlows_4: COMPLETED, UNEXPECTED)283                            with patch("{}._process_completed_state".format(284                                    pig_import)) as complete_handler:285                                ds._get_iter('query here', 'query target')286                                complete_handler.assert_called_with(287                                    ANY, 'UNEXPECTED', 1)288                            # unexpected and completed status check289                            # (DescribeJobFlows_5: UNEXPECTED, UNEXPECTED)290                            # (DescribeJobFlows_6: WAITING, PENDING)291                            # (DescribeJobFlows_7: COMPLETED, COMPLETED)292                            with patch("{}._process_waiting_state".format(293                                    pig_import)) as waiting_handler:294                                with patch("{}._process_completed_state".295                                           format(pig_import)) as \296                                        complete_handler:297                                    ds._get_iter('query here', 'query target')298                                    waiting_handler.assert_called_with(299                                        ANY, 'PENDING', 1)300                                    complete_handler.assert_called_with(301                                        ANY, 'COMPLETED', 1)302                            # running and completed status check303                            # (DescribeJobFlows_8: RUNNING, RUNNING)304                            # (DescribeJobFlows_9: WAITING, COMPLETED)305                            with patch("{}._process_running_state".format(306                                    pig_import)) as run_handler:307                                with patch("{}._process_waiting_state".format(308                                        pig_import)) as wait_handler:309                                    ds._get_iter('query here', 'query target')310                                    run_handler.assert_called_with(311                                        ANY, 'RUNNING', 1)312                                    wait_handler.assert_called_with(313                                        ANY, 'COMPLETED', 1)314                            # DescribeJobFlows_10 - corrupted response315                            # (no ExecutionStatusDetail)316                            self.assertRaises(ImportHandlerException,317                                              ds._get_iter,318                                              "query here", "query target")319                            # DescribeJobFlows_11 - corrupted response320                            # (no State)321                            self.assertRaises(ImportHandlerException,322                                              ds._get_iter,323                                              "query here", "query target")...test_iterators.py
Source:test_iterators.py  
...18            cmds.polyColorPerVertex(r=i*0.01, g=i*0.02, b=i*0.05)19            cmds.polyColorPerVertex(a=i*0.1)20    def test_props(self):21        miter = om2.MItMeshVertex(self.dagpath)22        iter = self._get_iter()23        for vtx in iter:24            self.assertEqual(vtx.index, miter.index())25            self.assertSequenceEqual(list(vtx.connected_edge_indices), list(miter.getConnectedEdges()))26            self.assertSequenceEqual(list(vtx.connected_face_indices), list(miter.getConnectedFaces()))27            self.assertSequenceEqual(list(vtx.connected_vertex_indices), list(miter.getConnectedVertices()))28            self.assertEqual(vtx.connected_edge_count, miter.numConnectedEdges())29            self.assertEqual(vtx.connected_face_count, miter.numConnectedEdges())30            self.assertEqual(vtx.on_boundary, miter.onBoundary())31            miter.next()32    def test_color(self):33        miter = om2.MItMeshVertex(self.dagpath)34        iter = self._get_iter()35        for vtx in iter:36            self.assertEqual(vtx.has_color(), miter.hasColor())37            for f in range(miter.numConnectedFaces()):38                self.assertEqual(vtx.has_color(f), miter.hasColor(f))39            self.assertEqual(vtx.color(None, self.color), miter.getColor(self.color))40            # for f in range(miter.numConnectedFaces()):41            #     self.assertEqual(vtx.color(f, self.color), miter.color(f. self.color))42            self.assertSequenceEqual(list(vtx.colors(self.color)), list(miter.getColors(self.color)))43            self.assertSequenceEqual(list(vtx.color_indices(self.color)), list(miter.getColorIndices(self.color)))44            miter.next()45    def test_normal(self):46        miter = om2.MItMeshVertex(self.dagpath)47        iter = self._get_iter()48        for vtx in iter:49            self.assertEqual(vtx.normal(), miter.getNormal())50            # for f in range(miter.numConnectedFaces()):51            #     self.assertEqual(vtx.normal(f, om2.MSpace.kObject), miter.getNormal(f, om2.MSpace.kObject))52            self.assertSequenceEqual(list(vtx.normals()), list(miter.getNormals()))53            self.assertSequenceEqual(list(vtx.normal_indices()), list(miter.getNormalIndices()))54            miter.next()55    def test_uv(self):56        miter = om2.MItMeshVertex(self.dagpath)57        iter = self._get_iter()58        uv_set = 'map1'59        for vtx in iter:60            self.assertEqual(vtx.uv(uv_set), miter.getUV(uv_set))61            self.assertSequenceEqual(list(vtx.uv_indices(uv_set)), list(miter.getUVIndices(uv_set)))62            for f, uv in vtx.uvs(uv_set).items():63                self.assertSequenceEqual(uv, miter.getUV(f, uv_set))64            self.assertEqual(vtx.uv_count(uv_set), miter.numUVs(uv_set))65            miter.next()66    def test_position(self):67        miter = om2.MItMeshVertex(self.dagpath)68        iter = self._get_iter()69        for vtx in iter:70            self.assertSequenceEqual(list(vtx.position()), list(miter.position()))71            miter.next()72    def _get_iter(self):73        mcomp = om2.MFnSingleIndexedComponent()74        mobj = mcomp.create(om2.MFn.kMeshVertComponent)75        mcomp.setCompleteData(cmds.polyEvaluate(self.shape1, vertex=True))76        comp = qm.MeshVertex(mobj, self.dagpath)77        return qm.MeshVertexIter(om2.MItMeshVertex(self.dagpath), comp, om2.MFnMesh(self.dagpath))78class TestMeshFaceIter(unittest.TestCase):79    def setUp(self) -> None:80        cmds.file(new=True, force=True)81        cube1, _ = cmds.polyCube()82        self.shape1 = cmds.ls(cmds.listRelatives(cube1, shapes=True), long=True)[0]83        self.dagpath = om2.MGlobal.getSelectionListByName(self.shape1).getDagPath(0)84    def test_props(self):85        miter = om2.MItMeshPolygon(self.dagpath)86        iter = self._get_iter()87        for face in iter:88            self.assertEqual(face.index, miter.index())89            self.assertSequenceEqual(list(face.connected_vertex_indices), list(miter.getConnectedVertices()))90            self.assertSequenceEqual(list(face.connected_face_indices), list(miter.getConnectedFaces()))91            self.assertSequenceEqual(list(face.connected_edge_indices), list(miter.getConnectedEdges()))92            self.assertSequenceEqual(list(face.edge_indices), list(miter.getEdges()))93            self.assertSequenceEqual(list(face.vertex_indices), list(miter.getVertices()))94            self.assertEqual(face.is_convex, miter.isConvex())95            self.assertEqual(face.is_lamina, miter.isLamina())96            self.assertEqual(face.is_planar, miter.isPlanar())97            self.assertEqual(face.is_zero_area, miter.zeroArea())98            self.assertEqual(face.on_boundary, miter.onBoundary())99            self.assertEqual(face.triangle_count, miter.numTriangles())100            miter.next()101    def test_geom(self):102        miter = om2.MItMeshPolygon(self.dagpath)103        iter = self._get_iter()104        for face in iter:105            self.assertEqual(face.center(), miter.center())106            self.assertEqual(face.area(), miter.getArea())107            self.assertSequenceEqual(list(face.points()), list(miter.getPoints()))108            miter.next()109    def test_normal(self):110        miter = om2.MItMeshPolygon(self.dagpath)111        iter = self._get_iter()112        for face in iter:113            self.assertEqual(face.normal(), miter.getNormal())114            # for v in miter.getVertices():115            #     self.assertEqual(face.normal(v), miter.getNormal(v))116            self.assertSequenceEqual(list(face.normals()), list(miter.getNormals()))117            miter.next()118    def test_triangle(self):119        miter = om2.MItMeshPolygon(self.dagpath)120        iter = self._get_iter()121        for face in iter:122            for t in range(miter.numTriangles()):123                points, vertices = miter.getTriangle(t)124                for i, (v, p) in enumerate(face.triangle(t).items()):125                    self.assertEqual(v, vertices[i])126                    self.assertEqual(p, points[i])127            vertices = list(miter.getVertices())128            points = miter.getPoints()129            for tri in face.triangles():130                for v, p in tri.items():131                    self.assertIn(v, vertices)132                    self.assertEqual(p, points[vertices.index(v)])133            miter.next()134    def _get_iter(self):135        mcomp = om2.MFnSingleIndexedComponent()136        mobj = mcomp.create(om2.MFn.kMeshPolygonComponent)137        mcomp.setCompleteData(cmds.polyEvaluate(self.shape1, face=True))138        comp = qm.MeshFace(mobj, self.dagpath)139        return qm.MeshFaceIter(om2.MItMeshPolygon(self.dagpath), comp)140class TestMeshEdgeIter(unittest.TestCase):141    def setUp(self) -> None:142        cmds.file(new=True, force=True)143        cube1, _ = cmds.polyCube()144        self.shape1 = cmds.ls(cmds.listRelatives(cube1, shapes=True), long=True)[0]145        self.dagpath = om2.MGlobal.getSelectionListByName(self.shape1).getDagPath(0)146    def test_props(self):147        miter = om2.MItMeshEdge(self.dagpath)148        iter = self._get_iter()149        for edge in iter:150            self.assertEqual(edge.index, miter.index())151            self.assertSequenceEqual(list(edge.connected_edge_indices), list(miter.getConnectedEdges()))152            self.assertSequenceEqual(list(edge.connected_face_indices), list(miter.getConnectedFaces()))153            self.assertSequenceEqual(list(edge.vertex_indices), [miter.vertexId(0), miter.vertexId(1)])154            self.assertEqual(edge.on_boudary, miter.onBoundary())155            self.assertEqual(edge.is_smooth, miter.isSmooth)156            miter.next()157    def test_geom(self):158        miter = om2.MItMeshEdge(self.dagpath)159        iter = self._get_iter()160        for edge in iter:161            self.assertEqual(edge.center(), miter.center())162            self.assertEqual(edge.length(), miter.length())163            self.assertSequenceEqual(list(edge.points()), [miter.point(0), miter.point(1)])164            miter.next()165    def _get_iter(self):166        mcomp = om2.MFnSingleIndexedComponent()167        mobj = mcomp.create(om2.MFn.kMeshEdgeComponent)168        mcomp.setCompleteData(cmds.polyEvaluate(self.shape1, edge=True))169        comp = qm.MeshEdge(mobj, self.dagpath)170        return qm.MeshEdgeIter(om2.MItMeshEdge(self.dagpath), comp)171class TestMeshVertexFaceITer(unittest.TestCase):172    def setUp(self) -> None:173        cmds.file(new=True, force=True)174        cube1, _ = cmds.polyCube()175        self.shape1 = cmds.ls(cmds.listRelatives(cube1, shapes=True), long=True)[0]176        self.dagpath = om2.MGlobal.getSelectionListByName(self.shape1).getDagPath(0)177        cmds.select(cube1, replace=True)178        self.color = cmds.polyColorSet(create=True)[0]179        for i in range(cmds.polyEvaluate(self.shape1, vertex=True)):180            cmds.select(f'{self.shape1}.vtx[{i}]', replace=True)181            cmds.polyColorPerVertex(r=i*0.01, g=i*0.02, b=i*0.05)182            cmds.polyColorPerVertex(a=i*0.1)183    def test_props(self):184        miter = om2.MItMeshFaceVertex(self.dagpath)185        iter = self._get_iter()186        for vf in iter:187            self.assertSequenceEqual(list(vf.index), [miter.vertexId(), miter.faceId()])188            self.assertEqual(vf.vertex_index, miter.vertexId())189            self.assertEqual(vf.face_index, miter.faceId())190            self.assertEqual(vf.face_vertex_index, miter.faceVertexId())191            self.assertEqual(vf.has_color, miter.hasColor())192            miter.next()193    def test_color(self):194        miter = om2.MItMeshFaceVertex(self.dagpath)195        iter = self._get_iter()196        for fv in iter:197            self.assertEqual(fv.color(self.color), miter.getColor(self.color))198            self.assertEqual(fv.color_index(self.color), miter.getColorIndex(self.color))199            miter.next()200    def test_normal(self):201        miter = om2.MItMeshFaceVertex(self.dagpath)202        iter = self._get_iter()203        for fv in iter:204            self.assertEqual(fv.normal(), miter.getNormal())205            self.assertEqual(fv.tangent(om2.MSpace.kObject, 'map1'), miter.getTangent(om2.MSpace.kObject, 'map1'))206            self.assertEqual(fv.binormal(om2.MSpace.kObject, 'map1'), miter.getBinormal(om2.MSpace.kObject, 'map1'))207            self.assertEqual(fv.normal_index(), miter.normalId())208            self.assertEqual(fv.tangent_index(), miter.tangentId())209            miter.next()210    def test_uv(self):211        miter = om2.MItMeshFaceVertex(self.dagpath)212        iter = self._get_iter()213        uv_set = 'map1'214        for fv in iter:215            self.assertEqual(fv.has_uv(uv_set), miter.hasUVs(uv_set))216            self.assertEqual(fv.uv(uv_set), miter.getUV(uv_set))217            self.assertEqual(fv.uv_index(), miter.getUVIndex(uv_set))218            miter.next()219    def test_position(self):220        miter = om2.MItMeshFaceVertex(self.dagpath)221        iter = self._get_iter()222        for vf in iter:223            self.assertSequenceEqual(list(vf.position()), list(miter.position()))224            miter.next()225    def _get_iter(self):226        mcomp = om2.MFnDoubleIndexedComponent()227        mobj = mcomp.create(om2.MFn.kMeshVtxFaceComponent)228        mcomp.setCompleteData(cmds.polyEvaluate(self.shape1, vertex=True), cmds.polyEvaluate(self.shape1, face=True))229        comp = qm.MeshVertexFace(mobj, self.dagpath)...SetList.py
Source:SetList.py  
...7            for elem in init_list:8                self.append(elem)9    def __add__(self, list2):10        new = SetList(self)11        for elem in self._get_iter(list2):12            new.append(elem)13        return new14    @staticmethod15    def _get_iter(list2):16        """Return iterable list"""17        if not isinstance(list2, collections.Iterable):18            return [list2]19        else:20            return list221    def __iadd__(self, list2):22        for elem in self._get_iter(list2):23            self.append(elem)24        return self25    def __sub__(self, list2):26        new = SetList(self)27        for elem in self._get_iter(list2):28            if elem in new:29                new.remove(elem)30        return new31    def __isub__(self, list2):32        for elem in self._get_iter(list2):33            if elem in self:34                self.remove(elem)35        return self36    def append(self, elem):37        if elem not in self:38            list.append(self, elem)39    def extend(self, list2):40        for elem in list2:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
