Best Python code snippet using autotest_python
ProcessingTest.py
Source:ProcessingTest.py  
...81        self.assertEqual(targets, [])82    def test_run(self):83        self.sections['cli'].append(Setting('jobs', '1'))84        cache = FileCache(self.log_printer, 'coala_test', flush_cache=True)85        results = execute_section(self.sections['cli'],86                                  self.global_bears['cli'],87                                  self.local_bears['cli'],88                                  lambda *args: self.result_queue.put(args[2]),89                                  cache,90                                  self.log_printer,91                                  console_printer=self.console_printer)92        self.assertTrue(results[0])93        local_results = self.result_queue.get(timeout=0)94        global_results = self.result_queue.get(timeout=0)95        self.assertTrue(self.result_queue.empty())96        self.assertEqual(len(local_results), 1)97        self.assertEqual(len(global_results), 1)98        # Result dict also returned99        # One file100        self.assertEqual(len(results[1]), 1)101        # One global bear102        self.assertEqual(len(results[2]), 1)103        local_result = local_results[0]104        global_result = global_results[0]105        self.assertRegex(repr(local_result),106                         "<Result object\\(id={}, origin='LocalTestBear', aff"107                         'ected_code=\\(\\), severity=NORMAL, confidence=100'108                         ", message='test msg', aspect=NoneType, "109                         'applied_actions={}\\) at '110                         '0x[0-9a-fA-F]+>'.format(hex(local_result.id), '{}'))111        self.assertRegex(repr(global_result),112                         "<Result object\\(id={}, origin='GlobalTestBear', "113                         'affected_code=\\(.*start=.*file=.*section_executor_'114                         'test_files.*line=None.*end=.*\\), severity=NORMAL, c'115                         "onfidence=100, message='test message', "116                         'aspect=NoneType, applied_actions={}\\'117                         ') at 0x[0-9a-fA-F]+>'.format(hex(global_result.id),118                                                       '{}'))119    def test_empty_run(self):120        execute_section(self.sections['cli'],121                        [],122                        [],123                        lambda *args: self.result_queue.put(args[2]),124                        None,125                        self.log_printer,126                        console_printer=self.console_printer)127        self.sections['cli'].append(Setting('jobs', 'bogus!'))128        results = execute_section(self.sections['cli'],129                                  [],130                                  [],131                                  lambda *args: self.result_queue.put(args[2]),132                                  None,133                                  self.log_printer,134                                  console_printer=self.console_printer)135        # No results136        self.assertFalse(results[0])137        # One file138        self.assertEqual(len(results[1]), 1)139        # No global bear140        self.assertEqual(len(results[2]), 0)141    def test_mixed_run(self):142        self.sections['mixed'].append(Setting('jobs', '1'))143        log_printer = ListLogPrinter()144        global_bears = self.global_bears['mixed']145        local_bears = self.local_bears['mixed']146        bears = global_bears + local_bears147        with LogCapture() as capture:148            execute_section(self.sections['mixed'],149                            global_bears,150                            local_bears,151                            lambda *args: self.result_queue.put(args[2]),152                            None,153                            log_printer,154                            console_printer=self.console_printer)155        capture.check(156            ('root', 'ERROR', "Bears that uses raw files can't be mixed with "157                              'Bears that uses text files. Please move the '158                              'following bears to their own section: ' +159             ', '.join(bear.name for bear in bears if not bear.USE_RAW_FILES))160        )161    def test_raw_run(self):162        self.sections['raw'].append(Setting('jobs', '1'))163        results = execute_section(self.sections['raw'],164                                  self.global_bears['raw'],165                                  self.local_bears['raw'],166                                  lambda *args: self.result_queue.put(args[2]),167                                  None,168                                  self.log_printer,169                                  console_printer=self.console_printer)170        self.assertTrue(results[0])171        # One file172        self.assertEqual(len(results[1]), 1)173        # One global bear174        self.assertEqual(len(results[2]), 1)175        # The only file tested should be the unreadable file176        # HACK: The real test of seeing the content of the array177        #       is the same as expected will fail on Windows178        #       due to a problem with how coala handles path.179        self.assertEqual(self.unreadable_path.lower(),180                         results[1].keys()[0].lower())181        # HACK: This is due to the problem with how coala handles paths182        #       that makes it problematic for Windows compatibility183        self.unreadable_path = results[1].keys()[0]184        self.assertEqual([bear.name for bear in self.global_bears['raw']],185                         results[2].keys())186        self.assertEqual(results[1][self.unreadable_path],187                         [Result('LocalTestRawBear', 'test msg')])188        self.assertEqual(results[2][self.global_bears['raw'][0].name],189                         [Result.from_values('GlobalTestRawBear',190                                             'test message',191                                             self.unreadable_path)])192    def test_process_queues(self):193        ctrlq = queue.Queue()194        # Append custom controlling sequences.195        # Simulated process 1196        ctrlq.put((CONTROL_ELEMENT.LOCAL, 1))197        ctrlq.put((CONTROL_ELEMENT.LOCAL_FINISHED, None))198        ctrlq.put((CONTROL_ELEMENT.GLOBAL, 1))199        # Simulated process 2200        ctrlq.put((CONTROL_ELEMENT.LOCAL, 2))201        # Simulated process 1202        ctrlq.put((CONTROL_ELEMENT.GLOBAL_FINISHED, None))203        # Simulated process 2204        ctrlq.put((CONTROL_ELEMENT.LOCAL_FINISHED, None))205        ctrlq.put((CONTROL_ELEMENT.GLOBAL, 1))206        ctrlq.put((CONTROL_ELEMENT.GLOBAL_FINISHED, None))207        first_local = Result.from_values('o', 'The first result.', file='f')208        second_local = Result.from_values('ABear',209                                          'The second result.',210                                          file='f',211                                          line=1)212        third_local = Result.from_values('ABear',213                                         'The second result.',214                                         file='f',215                                         line=4)216        fourth_local = Result.from_values('ABear',217                                          'Another result.',218                                          file='f',219                                          line=7)220        first_global = Result('o', 'The one and only global result.')221        section = Section('')222        section.append(Setting('min_severity', 'normal'))223        process_queues(224            [DummyProcess(control_queue=ctrlq) for i in range(3)],225            ctrlq,226            {1: [first_local,227                 second_local,228                 third_local,229                 # The following are to be ignored230                 Result('o', 'm', severity=RESULT_SEVERITY.INFO),231                 Result.from_values('ABear', 'u', 'f', 2, 1),232                 Result.from_values('ABear', 'u', 'f', 3, 1)],233             2: [fourth_local,234                 # The following are to be ignored235                 HiddenResult('t', 'c'),236                 Result.from_values('ABear', 'u', 'f', 5, 1),237                 Result.from_values('ABear', 'u', 'f', 6, 1)]},238            {1: [first_global]},239            {'f': ['first line  # stop ignoring, invalid ignore range\n',240                   'second line  # ignore all\n',241                   'third line\n',242                   "fourth line  # gnore shouldn't trigger without i!\n",243                   '# Start ignoring ABear, BBear and CBear\n',244                   '# Stop ignoring\n',245                   'seventh']},246            lambda *args: self.queue.put(args[2]),247            section,248            None,249            self.log_printer,250            self.console_printer)251        self.assertEqual(self.queue.get(timeout=0), ([second_local,252                                                      third_local]))253        self.assertEqual(self.queue.get(timeout=0), ([fourth_local]))254        self.assertEqual(self.queue.get(timeout=0), ([first_global]))255        self.assertEqual(self.queue.get(timeout=0), ([first_global]))256    def test_dead_processes(self):257        ctrlq = queue.Queue()258        # Not enough FINISH elements in the queue, processes start already dead259        # Also queue elements are reversed260        ctrlq.put((CONTROL_ELEMENT.GLOBAL_FINISHED, None))261        ctrlq.put((CONTROL_ELEMENT.LOCAL_FINISHED, None))262        process_queues(263            [DummyProcess(ctrlq, starts_dead=True) for i in range(3)],264            ctrlq, {}, {}, {},265            lambda *args: self.queue.put(args[2]),266            Section(''),267            None,268            self.log_printer,269            self.console_printer)270        with self.assertRaises(queue.Empty):271            self.queue.get(timeout=0)272        # Not enough FINISH elements in the queue, processes start already dead273        ctrlq.put((CONTROL_ELEMENT.LOCAL_FINISHED, None))274        ctrlq.put((CONTROL_ELEMENT.GLOBAL_FINISHED, None))275        process_queues(276            [DummyProcess(ctrlq, starts_dead=True) for i in range(3)],277            ctrlq, {}, {}, {},278            lambda *args: self.queue.put(args[2]),279            Section(''),280            None,281            self.log_printer,282            self.console_printer)283        with self.assertRaises(queue.Empty):284            self.queue.get(timeout=0)285    def test_create_process_group(self):286        p = create_process_group([sys.executable,287                                  '-c',288                                  process_group_test_code],289                                 stdout=subprocess.PIPE,290                                 stderr=subprocess.PIPE)291        retval = p.wait()292        if retval != 0:293            for line in p.stderr:294                print(line, end='')295            raise Exception('Subprocess did not exit correctly')296        output = [i for i in p.stdout]297        p.stderr.close()298        p.stdout.close()299        pid, pgid = [int(i.strip()) for i_out in output for i in i_out.split()]300        if platform.system() != 'Windows':301            # There is no way of testing this on windows with the current302            # python modules subprocess and os303            self.assertEqual(p.pid, pgid)304    def test_filter_raising_callables(self):305        class A(Exception):306            pass307        class B(Exception):308            pass309        class C(Exception):310            pass311        def create_exception_raiser(exception):312            def raiser(exc):313                if exception in exc:314                    raise exception315                return exception316            return raiser317        raiseA, raiseB, raiseC = (create_exception_raiser(exc)318                                  for exc in [A, B, C])319        test_list = [raiseA, raiseC, raiseB, raiseC]320        self.assertEqual(list(filter_raising_callables(test_list, A, (A,))),321                         [C, B, C])322        self.assertEqual(list(filter_raising_callables(test_list,323                                                       (B, C),324                                                       exc=(B, C))),325                         [A])326        # Test whether non filtered exceptions bubble up.327        with self.assertRaises(B):328            list(filter_raising_callables(test_list, C, exc=(B, C)))329    def test_get_file_dict(self):330        with LogCapture() as capture:331            file_dict = get_file_dict([self.testcode_c_path], self.log_printer)332        self.assertEqual(len(file_dict), 1)333        self.assertEqual(type(file_dict[self.testcode_c_path]),334                         tuple,335                         msg='files in file_dict should not be editable')336        capture.check(337            ('root', 'DEBUG', 'Files that will be checked:\n' +338             self.testcode_c_path)339        )340    def test_get_file_dict_non_existent_file(self):341        with LogCapture() as capture:342            file_dict = get_file_dict(['non_existent_file'], self.log_printer)343        self.assertEqual(file_dict, {})344        capture.check(345            ('root', 'WARNING',346             StringComparison(r".*Failed to read file 'non_existent_file' "347                              r'because of an unknown error.*')),348            ('root', 'INFO', StringComparison(r'.*Exception was:.*')),349            ('root', 'DEBUG',350             StringComparison(r'.*Files that will be checked.*'))351        )352    def test_get_file_dict_allow_raw_file(self):353        log_printer = ListLogPrinter()354        with LogCapture() as capture:355            file_dict = get_file_dict([self.unreadable_path], log_printer,356                                      True)357        self.assertNotEqual(file_dict, {})358        self.assertEqual(file_dict[self.unreadable_path], None)359        capture.check(360            ('root', 'DEBUG', StringComparison(r'(?s).*Files that will be '361                                               r'checked(?s).*'))362        )363    def test_get_file_dict_forbid_raw_file(self):364        log_printer = ListLogPrinter()365        with LogCapture() as capture:366            file_dict = get_file_dict([self.unreadable_path], log_printer,367                                      False)368        self.assertEqual(file_dict, {})369        capture.check(370            ('root', 'WARNING', "Failed to read file '{}'. It seems to contain "371             'non-unicode characters. Leaving it out.'372                .format(self.unreadable_path)),373            ('root', 'DEBUG', StringComparison(r'(?s).*Files that will be '374                                               r'checked(?s).*'))375        )376    def test_simplify_section_result(self):377        results = (True,378                   {'file1': [Result('a', 'b')], 'file2': None},379                   {'file3': [Result('a', 'c')]},380                   None)381        yielded, yielded_unfixed, all_results = simplify_section_result(results)382        self.assertEqual(yielded, True)383        self.assertEqual(yielded_unfixed, True)384        self.assertEqual(len(all_results), 2)385    def test_ignore_results(self):386        ranges = [([], SourceRange.from_values('f', 1, 1, 2, 2))]387        result = Result.from_values('origin (Something Specific)',388                                    'message',389                                    file='e',390                                    line=1,391                                    column=1,392                                    end_line=2,393                                    end_column=2)394        self.assertFalse(check_result_ignore(result, ranges))395        ranges.append(([], SourceRange.from_values('e', 2, 3, 3, 3)))396        self.assertFalse(check_result_ignore(result, ranges))397        ranges.append(([], SourceRange.from_values('e', 1, 1, 2, 2)))398        self.assertTrue(check_result_ignore(result, ranges))399        result1 = Result.from_values('origin', 'message', file='e')400        self.assertTrue(check_result_ignore(result1, ranges))401        ranges = [(['something', 'else', 'not origin'],402                   SourceRange.from_values('e', 1, 1, 2, 2))]403        self.assertFalse(check_result_ignore(result, ranges))404        ranges = [(['something', 'else', 'origin'],405                   SourceRange.from_values('e', 1, 1, 2, 2))]406        self.assertTrue(check_result_ignore(result, ranges))407    def test_ignore_glob(self):408        result = Result.from_values('LineLengthBear',409                                    'message',410                                    file='d',411                                    line=1,412                                    column=1,413                                    end_line=2,414                                    end_column=2)415        ranges = [(['(line*|space*)', 'py*'],416                   SourceRange.from_values('d', 1, 1, 2, 2))]417        self.assertTrue(check_result_ignore(result, ranges))418        result = Result.from_values('SpaceConsistencyBear',419                                    'message',420                                    file='d',421                                    line=1,422                                    column=1,423                                    end_line=2,424                                    end_column=2)425        ranges = [(['(line*|space*)', 'py*'],426                   SourceRange.from_values('d', 1, 1, 2, 2))]427        self.assertTrue(check_result_ignore(result, ranges))428        result = Result.from_values('XMLBear',429                                    'message',430                                    file='d',431                                    line=1,432                                    column=1,433                                    end_line=2,434                                    end_column=2)435        ranges = [(['(line*|space*)', 'py*'],436                   SourceRange.from_values('d', 1, 1, 2, 2))]437        self.assertFalse(check_result_ignore(result, ranges))438    def test_yield_ignore_ranges(self):439        test_file_dict_a = {'f':440                            ('# Ignore aBear\n',441                             'a_string = "This string should be ignored"\n')}442        test_ignore_range_a = list(yield_ignore_ranges(test_file_dict_a))443        for test_bears, test_source_range in test_ignore_range_a:444            self.assertEqual(test_bears, ['abear'])445            self.assertEqual(test_source_range.start.line, 1)446            self.assertEqual(test_source_range.start.column, 1)447            self.assertEqual(test_source_range.end.line, 2)448            self.assertEqual(test_source_range.end.column, 43)449        test_file_dict_b = {'f':450                            ('# start Ignoring bBear\n',451                             'b_string = "This string should be ignored"\n',452                             '# stop ignoring\n')}453        test_ignore_range_b = list(yield_ignore_ranges(test_file_dict_b))454        for test_bears, test_source_range in test_ignore_range_b:455            self.assertEqual(test_bears, ['bbear'])456            self.assertEqual(test_source_range.start.line, 1)457            self.assertEqual(test_source_range.start.column, 1)458            self.assertEqual(test_source_range.end.line, 3)459            self.assertEqual(test_source_range.end.column, 16)460        test_file_dict_c = {'f':461                            ('# Start ignoring cBear\n',462                             '# Stop ignoring cBear This & prev ignored\n')}463        test_ignore_range_c = list(yield_ignore_ranges(test_file_dict_c))464        for test_bears, test_source_range in test_ignore_range_c:465            self.assertEqual(test_bears, ['cbear'])466            self.assertEqual(test_source_range.start.line, 1)467            self.assertEqual(test_source_range.start.column, 1)468            self.assertEqual(test_source_range.end.line, 2)469            self.assertEqual(test_source_range.end.column, 42)470        test_file_dict_d = {'f':471                            ('# Start ignoring cBear\n',472                             'All of this ignored\n')}473        test_ignore_range_d = list(yield_ignore_ranges(test_file_dict_d))474        for test_bears, test_source_range in test_ignore_range_d:475            self.assertEqual(test_bears, ['cbear'])476            self.assertEqual(test_source_range.start.line, 1)477            self.assertEqual(test_source_range.start.column, 1)478            self.assertEqual(test_source_range.end.line, 2)479            self.assertEqual(test_source_range.end.column, 20)480        test_file_dict_e = {'f':481                            ('# Ignore all\n',482                             'e_string = "This string should be ignored"\n')}483        test_ignore_range_e = list(yield_ignore_ranges(test_file_dict_e))484        for test_bears, test_source_range in test_ignore_range_e:485            self.assertEqual(test_bears, [])486            self.assertEqual(test_source_range.start.line, 1)487            self.assertEqual(test_source_range.start.column, 1)488            self.assertEqual(test_source_range.end.line, 2)489            self.assertEqual(test_source_range.end.column, 43)490        test_file_dict_n = {'f':491                            ('# noqa nBear\n',492                             'n_string = "This string should be ignored"\n')}493        test_ignore_range_n = list(yield_ignore_ranges(test_file_dict_n))494        for test_bears, test_source_range in test_ignore_range_n:495            self.assertEqual(test_bears, ['nbear'])496            self.assertEqual(test_source_range.start.line, 1)497            self.assertEqual(test_source_range.start.column, 1)498            self.assertEqual(test_source_range.end.line, 2)499            self.assertEqual(test_source_range.end.column, 43)500        test_file_dict_n = {'f':501                            ('# noqa\n',502                             'n_string = "This string should be ignored"\n')}503        test_ignore_range_n = list(yield_ignore_ranges(test_file_dict_n))504        for test_bears, test_source_range in test_ignore_range_n:505            self.assertEqual(test_bears, [])506            self.assertEqual(test_source_range.start.line, 1)507            self.assertEqual(test_source_range.start.column, 1)508            self.assertEqual(test_source_range.end.line, 2)509            self.assertEqual(test_source_range.end.column, 43)510        # This case was a bug.511        test_file_dict_single_line = {'f': ('# ignore XBEAR',)}512        test_ignore_range_single_line = list(yield_ignore_ranges(513            test_file_dict_single_line))514        self.assertEqual(len(test_ignore_range_single_line), 1)515        bears, source_range = test_ignore_range_single_line[0]516        self.assertEqual(bears, ['xbear'])517        self.assertEqual(source_range.start.line, 1)518        self.assertEqual(source_range.start.column, 1)519        self.assertEqual(source_range.end.line, 1)520        self.assertEqual(source_range.end.column, 14)521    def test_loaded_bears_with_error_result(self):522        class BearWithMissingPrerequisites(Bear):523            def __init__(self, section, queue, timeout=0.1):524                Bear.__init__(self, section, queue, timeout)525            def run(self):526                return []527            @classmethod528            def check_prerequisites(cls):529                return False530        multiprocessing.Queue()531        tmp_local_bears = copy.copy(self.local_bears['cli'])532        tmp_local_bears.append(BearWithMissingPrerequisites)533        cache = FileCache(self.log_printer,534                          'coala_test_on_error',535                          flush_cache=True)536        results = execute_section(self.sections['cli'],537                                  [],538                                  tmp_local_bears,539                                  lambda *args: self.result_queue.put(args[2]),540                                  cache,541                                  self.log_printer,542                                  console_printer=self.console_printer)543        self.assertEqual(len(cache.data), 0)544        cache = FileCache(self.log_printer,545                          'coala_test_on_error',546                          flush_cache=False)547        results = execute_section(self.sections['cli'],548                                  [],549                                  self.local_bears['cli'],550                                  lambda *args: self.result_queue.put(args[2]),551                                  cache,552                                  self.log_printer,553                                  console_printer=self.console_printer)554        self.assertGreater(len(cache.data), 0)555class ProcessingTest_GetDefaultActions(unittest.TestCase):556    def setUp(self):557        self.section = Section('X')558    def test_no_key(self):559        self.assertEqual(get_default_actions(self.section), ({}, {}))560    def test_no_value(self):561        self.section.append(Setting('default_actions', ''))...11875_ProcessingTest.py
Source:11875_ProcessingTest.py  
...81        self.assertEqual(targets, [])82    def test_run(self):83        self.sections['cli'].append(Setting('jobs', '1'))84        cache = FileCache(self.log_printer, 'coala_test', flush_cache=True)85        results = execute_section(self.sections['cli'],86                                  self.global_bears['cli'],87                                  self.local_bears['cli'],88                                  lambda *args: self.result_queue.put(args[2]),89                                  cache,90                                  self.log_printer,91                                  console_printer=self.console_printer)92        self.assertTrue(results[0])93        local_results = self.result_queue.get(timeout=0)94        global_results = self.result_queue.get(timeout=0)95        self.assertTrue(self.result_queue.empty())96        self.assertEqual(len(local_results), 1)97        self.assertEqual(len(global_results), 1)98        # Result dict also returned99        # One file100        self.assertEqual(len(results[1]), 1)101        # One global bear102        self.assertEqual(len(results[2]), 1)103        local_result = local_results[0]104        global_result = global_results[0]105        self.assertRegex(repr(local_result),106                         "<Result object\\(id={}, origin='LocalTestBear', aff"107                         'ected_code=\\(\\), severity=NORMAL, confidence=100'108                         ", message='test msg', aspect=NoneType, "109                         'applied_actions={}\\) at '110                         '0x[0-9a-fA-F]+>'.format(hex(local_result.id), '{}'))111        self.assertRegex(repr(global_result),112                         "<Result object\\(id={}, origin='GlobalTestBear', "113                         'affected_code=\\(.*start=.*file=.*section_executor_'114                         'test_files.*line=None.*end=.*\\), severity=NORMAL, c'115                         "onfidence=100, message='test message', "116                         'aspect=NoneType, applied_actions={}\\'117                         ') at 0x[0-9a-fA-F]+>'.format(hex(global_result.id),118                                                       '{}'))119    def test_empty_run(self):120        execute_section(self.sections['cli'],121                        [],122                        [],123                        lambda *args: self.result_queue.put(args[2]),124                        None,125                        self.log_printer,126                        console_printer=self.console_printer)127        self.sections['cli'].append(Setting('jobs', 'bogus!'))128        results = execute_section(self.sections['cli'],129                                  [],130                                  [],131                                  lambda *args: self.result_queue.put(args[2]),132                                  None,133                                  self.log_printer,134                                  console_printer=self.console_printer)135        # No results136        self.assertFalse(results[0])137        # One file138        self.assertEqual(len(results[1]), 1)139        # No global bear140        self.assertEqual(len(results[2]), 0)141    def test_mixed_run(self):142        self.sections['mixed'].append(Setting('jobs', '1'))143        log_printer = ListLogPrinter()144        global_bears = self.global_bears['mixed']145        local_bears = self.local_bears['mixed']146        bears = global_bears + local_bears147        with LogCapture() as capture:148            execute_section(self.sections['mixed'],149                            global_bears,150                            local_bears,151                            lambda *args: self.result_queue.put(args[2]),152                            None,153                            log_printer,154                            console_printer=self.console_printer)155        capture.check(156            ('root', 'ERROR', "Bears that uses raw files can't be mixed with "157                              'Bears that uses text files. Please move the '158                              'following bears to their own section: ' +159             ', '.join(bear.name for bear in bears if not bear.USE_RAW_FILES))160        )161    def test_raw_run(self):162        self.sections['raw'].append(Setting('jobs', '1'))163        results = execute_section(self.sections['raw'],164                                  self.global_bears['raw'],165                                  self.local_bears['raw'],166                                  lambda *args: self.result_queue.put(args[2]),167                                  None,168                                  self.log_printer,169                                  console_printer=self.console_printer)170        self.assertTrue(results[0])171        # One file172        self.assertEqual(len(results[1]), 1)173        # One global bear174        self.assertEqual(len(results[2]), 1)175        # The only file tested should be the unreadable file176        # HACK: The real test of seeing the content of the array177        #       is the same as expected will fail on Windows178        #       due to a problem with how coala handles path.179        self.assertEqual(self.unreadable_path.lower(),180                         results[1].keys()[0].lower())181        # HACK: This is due to the problem with how coala handles paths182        #       that makes it problematic for Windows compatibility183        self.unreadable_path = results[1].keys()[0]184        self.assertEqual([bear.name for bear in self.global_bears['raw']],185                         results[2].keys())186        self.assertEqual(results[1][self.unreadable_path],187                         [Result('LocalTestRawBear', 'test msg')])188        self.assertEqual(results[2][self.global_bears['raw'][0].name],189                         [Result.from_values('GlobalTestRawBear',190                                             'test message',191                                             self.unreadable_path)])192    def test_process_queues(self):193        ctrlq = queue.Queue()194        # Append custom controlling sequences.195        # Simulated process 1196        ctrlq.put((CONTROL_ELEMENT.LOCAL, 1))197        ctrlq.put((CONTROL_ELEMENT.LOCAL_FINISHED, None))198        ctrlq.put((CONTROL_ELEMENT.GLOBAL, 1))199        # Simulated process 2200        ctrlq.put((CONTROL_ELEMENT.LOCAL, 2))201        # Simulated process 1202        ctrlq.put((CONTROL_ELEMENT.GLOBAL_FINISHED, None))203        # Simulated process 2204        ctrlq.put((CONTROL_ELEMENT.LOCAL_FINISHED, None))205        ctrlq.put((CONTROL_ELEMENT.GLOBAL, 1))206        ctrlq.put((CONTROL_ELEMENT.GLOBAL_FINISHED, None))207        first_local = Result.from_values('o', 'The first result.', file='f')208        second_local = Result.from_values('ABear',209                                          'The second result.',210                                          file='f',211                                          line=1)212        third_local = Result.from_values('ABear',213                                         'The second result.',214                                         file='f',215                                         line=4)216        fourth_local = Result.from_values('ABear',217                                          'Another result.',218                                          file='f',219                                          line=7)220        first_global = Result('o', 'The one and only global result.')221        section = Section('')222        section.append(Setting('min_severity', 'normal'))223        process_queues(224            [DummyProcess(control_queue=ctrlq) for i in range(3)],225            ctrlq,226            {1: [first_local,227                 second_local,228                 third_local,229                 # The following are to be ignored230                 Result('o', 'm', severity=RESULT_SEVERITY.INFO),231                 Result.from_values('ABear', 'u', 'f', 2, 1),232                 Result.from_values('ABear', 'u', 'f', 3, 1)],233             2: [fourth_local,234                 # The following are to be ignored235                 HiddenResult('t', 'c'),236                 Result.from_values('ABear', 'u', 'f', 5, 1),237                 Result.from_values('ABear', 'u', 'f', 6, 1)]},238            {1: [first_global]},239            {'f': ['first line  # stop ignoring, invalid ignore range\n',240                   'second line  # ignore all\n',241                   'third line\n',242                   "fourth line  # gnore shouldn't trigger without i!\n",243                   '# Start ignoring ABear, BBear and CBear\n',244                   '# Stop ignoring\n',245                   'seventh']},246            lambda *args: self.queue.put(args[2]),247            section,248            None,249            self.log_printer,250            self.console_printer)251        self.assertEqual(self.queue.get(timeout=0), ([second_local,252                                                      third_local]))253        self.assertEqual(self.queue.get(timeout=0), ([fourth_local]))254        self.assertEqual(self.queue.get(timeout=0), ([first_global]))255        self.assertEqual(self.queue.get(timeout=0), ([first_global]))256    def test_dead_processes(self):257        ctrlq = queue.Queue()258        # Not enough FINISH elements in the queue, processes start already dead259        # Also queue elements are reversed260        ctrlq.put((CONTROL_ELEMENT.GLOBAL_FINISHED, None))261        ctrlq.put((CONTROL_ELEMENT.LOCAL_FINISHED, None))262        process_queues(263            [DummyProcess(ctrlq, starts_dead=True) for i in range(3)],264            ctrlq, {}, {}, {},265            lambda *args: self.queue.put(args[2]),266            Section(''),267            None,268            self.log_printer,269            self.console_printer)270        with self.assertRaises(queue.Empty):271            self.queue.get(timeout=0)272        # Not enough FINISH elements in the queue, processes start already dead273        ctrlq.put((CONTROL_ELEMENT.LOCAL_FINISHED, None))274        ctrlq.put((CONTROL_ELEMENT.GLOBAL_FINISHED, None))275        process_queues(276            [DummyProcess(ctrlq, starts_dead=True) for i in range(3)],277            ctrlq, {}, {}, {},278            lambda *args: self.queue.put(args[2]),279            Section(''),280            None,281            self.log_printer,282            self.console_printer)283        with self.assertRaises(queue.Empty):284            self.queue.get(timeout=0)285    def test_create_process_group(self):286        p = create_process_group([sys.executable,287                                  '-c',288                                  process_group_test_code],289                                 stdout=subprocess.PIPE,290                                 stderr=subprocess.PIPE)291        retval = p.wait()292        if retval != 0:293            for line in p.stderr:294                print(line, end='')295            raise Exception('Subprocess did not exit correctly')296        output = [i for i in p.stdout]297        p.stderr.close()298        p.stdout.close()299        pid, pgid = [int(i.strip()) for i_out in output for i in i_out.split()]300        if platform.system() != 'Windows':301            # There is no way of testing this on windows with the current302            # python modules subprocess and os303            self.assertEqual(p.pid, pgid)304    def test_filter_raising_callables(self):305        class A(Exception):306            pass307        class B(Exception):308            pass309        class C(Exception):310            pass311        def create_exception_raiser(exception):312            def raiser(exc):313                if exception in exc:314                    raise exception315                return exception316            return raiser317        raiseA, raiseB, raiseC = (create_exception_raiser(exc)318                                  for exc in [A, B, C])319        test_list = [raiseA, raiseC, raiseB, raiseC]320        self.assertEqual(list(filter_raising_callables(test_list, A, (A,))),321                         [C, B, C])322        self.assertEqual(list(filter_raising_callables(test_list,323                                                       (B, C),324                                                       exc=(B, C))),325                         [A])326        # Test whether non filtered exceptions bubble up.327        with self.assertRaises(B):328            list(filter_raising_callables(test_list, C, exc=(B, C)))329    def test_get_file_dict(self):330        with LogCapture() as capture:331            file_dict = get_file_dict([self.testcode_c_path], self.log_printer)332        self.assertEqual(len(file_dict), 1)333        self.assertEqual(type(file_dict[self.testcode_c_path]),334                         tuple,335                         msg='files in file_dict should not be editable')336        capture.check(337            ('root', 'DEBUG', 'Files that will be checked:\n' +338             self.testcode_c_path)339        )340    def test_get_file_dict_non_existent_file(self):341        with LogCapture() as capture:342            file_dict = get_file_dict(['non_existent_file'], self.log_printer)343        self.assertEqual(file_dict, {})344        capture.check(345            ('root', 'WARNING',346             StringComparison(r".*Failed to read file 'non_existent_file' "347                              r'because of an unknown error.*')),348            ('root', 'INFO', StringComparison(r'.*Exception was:.*')),349            ('root', 'DEBUG',350             StringComparison(r'.*Files that will be checked.*'))351        )352    def test_get_file_dict_allow_raw_file(self):353        log_printer = ListLogPrinter()354        with LogCapture() as capture:355            file_dict = get_file_dict([self.unreadable_path], log_printer,356                                      True)357        self.assertNotEqual(file_dict, {})358        self.assertEqual(file_dict[self.unreadable_path], None)359        capture.check(360            ('root', 'DEBUG', StringComparison(r'(?s).*Files that will be '361                                               r'checked(?s).*'))362        )363    def test_get_file_dict_forbid_raw_file(self):364        log_printer = ListLogPrinter()365        with LogCapture() as capture:366            file_dict = get_file_dict([self.unreadable_path], log_printer,367                                      False)368        self.assertEqual(file_dict, {})369        capture.check(370            ('root', 'WARNING', "Failed to read file '{}'. It seems to contain "371             'non-unicode characters. Leaving it out.'372                .format(self.unreadable_path)),373            ('root', 'DEBUG', StringComparison(r'(?s).*Files that will be '374                                               r'checked(?s).*'))375        )376    def test_simplify_section_result(self):377        results = (True,378                   {'file1': [Result('a', 'b')], 'file2': None},379                   {'file3': [Result('a', 'c')]},380                   None)381        yielded, yielded_unfixed, all_results = simplify_section_result(results)382        self.assertEqual(yielded, True)383        self.assertEqual(yielded_unfixed, True)384        self.assertEqual(len(all_results), 2)385    def test_ignore_results(self):386        ranges = [([], SourceRange.from_values('f', 1, 1, 2, 2))]387        result = Result.from_values('origin (Something Specific)',388                                    'message',389                                    file='e',390                                    line=1,391                                    column=1,392                                    end_line=2,393                                    end_column=2)394        self.assertFalse(check_result_ignore(result, ranges))395        ranges.append(([], SourceRange.from_values('e', 2, 3, 3, 3)))396        self.assertFalse(check_result_ignore(result, ranges))397        ranges.append(([], SourceRange.from_values('e', 1, 1, 2, 2)))398        self.assertTrue(check_result_ignore(result, ranges))399        result1 = Result.from_values('origin', 'message', file='e')400        self.assertTrue(check_result_ignore(result1, ranges))401        ranges = [(['something', 'else', 'not origin'],402                   SourceRange.from_values('e', 1, 1, 2, 2))]403        self.assertFalse(check_result_ignore(result, ranges))404        ranges = [(['something', 'else', 'origin'],405                   SourceRange.from_values('e', 1, 1, 2, 2))]406        self.assertTrue(check_result_ignore(result, ranges))407    def test_ignore_glob(self):408        result = Result.from_values('LineLengthBear',409                                    'message',410                                    file='d',411                                    line=1,412                                    column=1,413                                    end_line=2,414                                    end_column=2)415        ranges = [(['(line*|space*)', 'py*'],416                   SourceRange.from_values('d', 1, 1, 2, 2))]417        self.assertTrue(check_result_ignore(result, ranges))418        result = Result.from_values('SpaceConsistencyBear',419                                    'message',420                                    file='d',421                                    line=1,422                                    column=1,423                                    end_line=2,424                                    end_column=2)425        ranges = [(['(line*|space*)', 'py*'],426                   SourceRange.from_values('d', 1, 1, 2, 2))]427        self.assertTrue(check_result_ignore(result, ranges))428        result = Result.from_values('XMLBear',429                                    'message',430                                    file='d',431                                    line=1,432                                    column=1,433                                    end_line=2,434                                    end_column=2)435        ranges = [(['(line*|space*)', 'py*'],436                   SourceRange.from_values('d', 1, 1, 2, 2))]437        self.assertFalse(check_result_ignore(result, ranges))438    def test_yield_ignore_ranges(self):439        test_file_dict_a = {'f':440                            ('# Ignore aBear\n',441                             'a_string = "This string should be ignored"\n')}442        test_ignore_range_a = list(yield_ignore_ranges(test_file_dict_a))443        for test_bears, test_source_range in test_ignore_range_a:444            self.assertEqual(test_bears, ['abear'])445            self.assertEqual(test_source_range.start.line, 1)446            self.assertEqual(test_source_range.start.column, 1)447            self.assertEqual(test_source_range.end.line, 2)448            self.assertEqual(test_source_range.end.column, 43)449        test_file_dict_b = {'f':450                            ('# start Ignoring bBear\n',451                             'b_string = "This string should be ignored"\n',452                             '# stop ignoring\n')}453        test_ignore_range_b = list(yield_ignore_ranges(test_file_dict_b))454        for test_bears, test_source_range in test_ignore_range_b:455            self.assertEqual(test_bears, ['bbear'])456            self.assertEqual(test_source_range.start.line, 1)457            self.assertEqual(test_source_range.start.column, 1)458            self.assertEqual(test_source_range.end.line, 3)459            self.assertEqual(test_source_range.end.column, 16)460        test_file_dict_c = {'f':461                            ('# Start ignoring cBear\n',462                             '# Stop ignoring cBear This & prev ignored\n')}463        test_ignore_range_c = list(yield_ignore_ranges(test_file_dict_c))464        for test_bears, test_source_range in test_ignore_range_c:465            self.assertEqual(test_bears, ['cbear'])466            self.assertEqual(test_source_range.start.line, 1)467            self.assertEqual(test_source_range.start.column, 1)468            self.assertEqual(test_source_range.end.line, 2)469            self.assertEqual(test_source_range.end.column, 42)470        test_file_dict_d = {'f':471                            ('# Start ignoring cBear\n',472                             'All of this ignored\n')}473        test_ignore_range_d = list(yield_ignore_ranges(test_file_dict_d))474        for test_bears, test_source_range in test_ignore_range_d:475            self.assertEqual(test_bears, ['cbear'])476            self.assertEqual(test_source_range.start.line, 1)477            self.assertEqual(test_source_range.start.column, 1)478            self.assertEqual(test_source_range.end.line, 2)479            self.assertEqual(test_source_range.end.column, 20)480        test_file_dict_e = {'f':481                            ('# Ignore all\n',482                             'e_string = "This string should be ignored"\n')}483        test_ignore_range_e = list(yield_ignore_ranges(test_file_dict_e))484        for test_bears, test_source_range in test_ignore_range_e:485            self.assertEqual(test_bears, [])486            self.assertEqual(test_source_range.start.line, 1)487            self.assertEqual(test_source_range.start.column, 1)488            self.assertEqual(test_source_range.end.line, 2)489            self.assertEqual(test_source_range.end.column, 43)490        test_file_dict_n = {'f':491                            ('# noqa nBear\n',492                             'n_string = "This string should be ignored"\n')}493        test_ignore_range_n = list(yield_ignore_ranges(test_file_dict_n))494        for test_bears, test_source_range in test_ignore_range_n:495            self.assertEqual(test_bears, ['nbear'])496            self.assertEqual(test_source_range.start.line, 1)497            self.assertEqual(test_source_range.start.column, 1)498            self.assertEqual(test_source_range.end.line, 2)499            self.assertEqual(test_source_range.end.column, 43)500        test_file_dict_n = {'f':501                            ('# noqa\n',502                             'n_string = "This string should be ignored"\n')}503        test_ignore_range_n = list(yield_ignore_ranges(test_file_dict_n))504        for test_bears, test_source_range in test_ignore_range_n:505            self.assertEqual(test_bears, [])506            self.assertEqual(test_source_range.start.line, 1)507            self.assertEqual(test_source_range.start.column, 1)508            self.assertEqual(test_source_range.end.line, 2)509            self.assertEqual(test_source_range.end.column, 43)510        # This case was a bug.511        test_file_dict_single_line = {'f': ('# ignore XBEAR',)}512        test_ignore_range_single_line = list(yield_ignore_ranges(513            test_file_dict_single_line))514        self.assertEqual(len(test_ignore_range_single_line), 1)515        bears, source_range = test_ignore_range_single_line[0]516        self.assertEqual(bears, ['xbear'])517        self.assertEqual(source_range.start.line, 1)518        self.assertEqual(source_range.start.column, 1)519        self.assertEqual(source_range.end.line, 1)520        self.assertEqual(source_range.end.column, 14)521    def test_loaded_bears_with_error_result(self):522        class BearWithMissingPrerequisites(Bear):523            def __init__(self, section, queue, timeout=0.1):524                Bear.__init__(self, section, queue, timeout)525            def run(self):526                return []527            @classmethod528            def check_prerequisites(cls):529                return False530        multiprocessing.Queue()531        tmp_local_bears = copy.copy(self.local_bears['cli'])532        tmp_local_bears.append(BearWithMissingPrerequisites)533        cache = FileCache(self.log_printer,534                          'coala_test_on_error',535                          flush_cache=True)536        results = execute_section(self.sections['cli'],537                                  [],538                                  tmp_local_bears,539                                  lambda *args: self.result_queue.put(args[2]),540                                  cache,541                                  self.log_printer,542                                  console_printer=self.console_printer)543        self.assertEqual(len(cache.data), 0)544        cache = FileCache(self.log_printer,545                          'coala_test_on_error',546                          flush_cache=False)547        results = execute_section(self.sections['cli'],548                                  [],549                                  self.local_bears['cli'],550                                  lambda *args: self.result_queue.put(args[2]),551                                  cache,552                                  self.log_printer,553                                  console_printer=self.console_printer)554        self.assertGreater(len(cache.data), 0)555class ProcessingTest_GetDefaultActions(unittest.TestCase):556    def setUp(self):557        self.section = Section('X')558    def test_no_key(self):559        self.assertEqual(get_default_actions(self.section), ({}, {}))560    def test_no_value(self):561        self.section.append(Setting('default_actions', ''))...__init__.py
Source:__init__.py  
1import sys2import os.path3from co2tools.mergers import merge4from co2tools.stl import solidify5from co2tools.__version__ import __version__6from yaml import load, SafeLoader7def help():8    print('co2tools [options] [action] [group]')9    print('Options:')10    print('\t--version\tPrint version')11    print('\t--base=folder\tSet base folder')12    print('Actions:')13    print('\tmerge\tRun only merge from yml file')14    print('\tsolidify\tRun only solidify from yml file')15    print('Group:')16    print('\tgpu\tDefines what group to run from yml file for given action')17def main():18    yaml_file = '.co2tools.yml'19    execute_action = None20    execute_section = None21    execute_file = None22    boolean_engine =  None23    base_folder = ''24    i = 125    while i < len(sys.argv):26        arg = sys.argv[i]27        i += 128        if arg.lower() == '--version':29            print('co2tools v{}'.format(__version__))30            quit()31        if arg.lower() == '--yaml':32            yaml_file = sys.argv[i]33            i += 134            continue35        if arg.lower() == 'help':36            help()37            quit()38        if arg.lower() == '--base':39            base_folder = sys.argv[i]40            if base_folder[-1] != '/':41                base_folder += '/'42            i += 143            continue44        if arg.lower() == '--engine':45            boolean_engine = sys.argv[i]46            i += 147            continue48        if execute_action is None:49            execute_action = arg50            continue51        if execute_section is None:52            execute_section = arg53            continue54        if execute_file is None:55            execute_file = arg56            continue57        else:58            execute_file = '{} {}'.format(execute_file, arg)59            continue60        raise BaseException(1, 'Unknown command line argument!!!')61    if base_folder is not None:62        yaml_file = '{}{}'.format(base_folder, yaml_file)63    if not os.path.exists(yaml_file):64        raise BaseException(1, 'File \'{}\' does not exist!!!'.format(yaml_file))65    with open(yaml_file, 'r') as f:66        yaml_data = load(f, Loader=SafeLoader)67        if 'merge' in yaml_data:68            merge(yaml_data, execute_action, execute_section, execute_file, base_folder)69        if 'solidify' in yaml_data:70            solidify(yaml_data, execute_action, execute_section, execute_file, base_folder, boolean_engine)71        if 'macros' in yaml_data:72            pass73if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
