Best Python code snippet using tappy_python
test_operation.py
Source:test_operation.py  
...105    def _make_link(link_id):106        return Link(command='', paw='123456', ability=test_ability, id=link_id, executor=next(test_ability.executors))107    return _make_link108@pytest.fixture109def make_test_result():110    def _make_result(link_id):111        result = dict(112            id=link_id,113            output=str(base64.b64encode('10.10.10.10'.encode('utf-8')).decode('utf-8')),114            pid=0,115            status=0116        )117        return Result(**result)118    return _make_result119@pytest.fixture120def op_with_learning_parser(ability, adversary):121    op = Operation(id='12345', name='testA', agents=[], adversary=adversary, use_learning_parsers=True)122    return op123@pytest.fixture124def op_without_learning_parser(ability, adversary):125    op = Operation(id='54321', name='testB', agents=[], adversary=adversary, use_learning_parsers=False)126    return op127@pytest.fixture128def op_with_learning_and_seeded(ability, adversary, operation_agent, parse_datestring):129    sc = Source(id='3124', name='test', facts=[Fact(trait='domain.user.name', value='bob')])130    op = Operation(id='6789', name='testC', agents=[], adversary=adversary, source=sc, use_learning_parsers=True)131    # patch operation to make it 'realistic'132    op.start = parse_datestring(OP_START_TIME)133    op.adversary = op.adversary()134    op.planner = Planner(planner_id='12345', name='test_planner',135                                                  module='not.an.actual.planner', params=None)136    op.objective = Objective(id='6428', name='not_an_objective')137    t_operation_agent = operation_agent138    t_operation_agent.paw = '123456'139    op.agents = [t_operation_agent]140    return op141class TestOperation:142    def test_ran_ability_id(self, ability, adversary):143        op = Operation(name='test', agents=[], adversary=adversary)144        mock_link = MagicMock(spec=Link, ability=ability(ability_id='123'), finish=MOCK_LINK_FINISH_TIME)145        op.chain = [mock_link]146        assert op.ran_ability_id('123')147    def test_event_logs(self, event_loop, op_for_event_logs, operation_agent, file_svc, data_svc, event_log_op_start_time,148                        op_agent_creation_time):149        event_loop.run_until_complete(data_svc.remove('agents', match=dict(unique=operation_agent.unique)))150        event_loop.run_until_complete(data_svc.store(operation_agent))151        want_agent_metadata = dict(152            paw='testpaw',153            group='red',154            architecture='amd64',155            username='testagent',156            location=r'C:\Users\Public\test.exe',157            pid=1234,158            ppid=123,159            privilege='User',160            host='WORKSTATION',161            contact='unknown',162            created=op_agent_creation_time,163        )164        want_operation_metadata = dict(165            operation_name='test',166            operation_start=event_log_op_start_time,167            operation_adversary='test adversary',168        )169        want_attack_metadata = dict(170            tactic='test tactic',171            technique_name='test technique',172            technique_id='T0000',173        )174        want = [175            dict(176                command='d2hvYW1p',177                delegated_timestamp=LINK1_DECIDE_TIME,178                collected_timestamp=LINK1_COLLECT_TIME,179                finished_timestamp=LINK1_FINISH_TIME,180                status=0,181                platform='windows',182                executor='psh',183                pid=789,184                agent_metadata=want_agent_metadata,185                ability_metadata=dict(186                  ability_id='123',187                  ability_name='test ability',188                  ability_description='test ability desc',189                ),190                operation_metadata=want_operation_metadata,191                attack_metadata=want_attack_metadata,192            ),193            dict(194                command='aG9zdG5hbWU=',195                delegated_timestamp=LINK2_DECIDE_TIME,196                collected_timestamp=LINK2_COLLECT_TIME,197                finished_timestamp=LINK2_FINISH_TIME,198                status=0,199                platform='windows',200                executor='psh',201                pid=7890,202                agent_metadata=want_agent_metadata,203                ability_metadata=dict(204                    ability_id='456',205                    ability_name='test ability 2',206                    ability_description='test ability 2 desc',207                ),208                operation_metadata=want_operation_metadata,209                attack_metadata=want_attack_metadata,210            ),211        ]212        event_logs = event_loop.run_until_complete(op_for_event_logs.event_logs(file_svc, data_svc))213        assert event_logs == want214    def test_writing_event_logs_to_disk(self, event_loop, op_for_event_logs, operation_agent, file_svc, data_svc,215                                        event_log_op_start_time, op_agent_creation_time):216        event_loop.run_until_complete(data_svc.remove('agents', match=dict(unique=operation_agent.unique)))217        event_loop.run_until_complete(data_svc.store(operation_agent))218        want_agent_metadata = dict(219            paw='testpaw',220            group='red',221            architecture='amd64',222            username='testagent',223            location=r'C:\Users\Public\test.exe',224            pid=1234,225            ppid=123,226            privilege='User',227            host='WORKSTATION',228            contact='unknown',229            created=op_agent_creation_time,230        )231        want_operation_metadata = dict(232            operation_name='test',233            operation_start=event_log_op_start_time,234            operation_adversary='test adversary',235        )236        want_attack_metadata = dict(237            tactic='test tactic',238            technique_name='test technique',239            technique_id='T0000',240        )241        want = [242            dict(243                command='d2hvYW1p',244                delegated_timestamp=LINK1_DECIDE_TIME,245                collected_timestamp=LINK1_COLLECT_TIME,246                finished_timestamp=LINK1_FINISH_TIME,247                status=0,248                platform='windows',249                executor='psh',250                pid=789,251                agent_metadata=want_agent_metadata,252                ability_metadata=dict(253                    ability_id='123',254                    ability_name='test ability',255                    ability_description='test ability desc',256                ),257                operation_metadata=want_operation_metadata,258                attack_metadata=want_attack_metadata,259            ),260            dict(261                command='aG9zdG5hbWU=',262                delegated_timestamp=LINK2_DECIDE_TIME,263                collected_timestamp=LINK2_COLLECT_TIME,264                finished_timestamp=LINK2_FINISH_TIME,265                status=0,266                platform='windows',267                executor='psh',268                pid=7890,269                agent_metadata=want_agent_metadata,270                ability_metadata=dict(271                    ability_id='456',272                    ability_name='test ability 2',273                    ability_description='test ability 2 desc',274                ),275                operation_metadata=want_operation_metadata,276                attack_metadata=want_attack_metadata,277            ),278        ]279        event_loop.run_until_complete(op_for_event_logs.write_event_logs_to_disk(file_svc, data_svc))280        target_path = '/tmp/event_logs/operation_%s.json' % op_for_event_logs.id281        assert os.path.isfile(target_path)282        try:283            with open(target_path, 'rb') as log_file:284                recorded_log = json.load(log_file)285            assert recorded_log == want286        finally:287            os.remove(target_path)288    @mock.patch.object(Operation, '_emit_state_change_event')289    def test_no_state_change_event_on_instantiation(self, mock_emit_state_change_method, fake_event_svc, adversary):290        Operation(name='test', agents=[], adversary=adversary)291        mock_emit_state_change_method.assert_not_called()292    @mock.patch.object(Operation, '_emit_state_change_event')293    def test_no_state_change_event_fired_when_setting_same_state(self, mock_emit_state_change_method, fake_event_svc,294                                                                 adversary):295        initial_state = 'running'296        op = Operation(name='test', agents=[], adversary=adversary, state=initial_state)297        op.state = initial_state298        mock_emit_state_change_method.assert_not_called()299    @mock.patch.object(Operation, '_emit_state_change_event')300    def test_state_change_event_fired_on_state_change(self, mock_emit_state_change_method, fake_event_svc, adversary):301        op = Operation(name='test', agents=[], adversary=adversary, state='running')302        op.state = 'finished'303        mock_emit_state_change_method.assert_called_with(from_state='running', to_state='finished')304    def test_emit_state_change_event(self, event_loop, fake_event_svc, adversary):305        op = Operation(name='test', agents=[], adversary=adversary, state='running')306        fake_event_svc.reset()307        event_loop.run_until_complete(308            op._emit_state_change_event(309                from_state='running',310                to_state='finished'311            )312        )313        expected_key = (Operation.EVENT_EXCHANGE, Operation.EVENT_QUEUE_STATE_CHANGED)314        assert expected_key in fake_event_svc.fired315        event_kwargs = fake_event_svc.fired[expected_key]316        assert event_kwargs['op'] == op.id317        assert event_kwargs['from_state'] == 'running'318        assert event_kwargs['to_state'] == 'finished'319    def test_with_learning_parser(self, event_loop, contact_svc, data_svc, learning_svc, event_svc, op_with_learning_parser,320                                  make_test_link, make_test_result, knowledge_svc):321        test_link = make_test_link(1234)322        op_with_learning_parser.add_link(test_link)323        test_result = make_test_result(test_link.id)324        event_loop.run_until_complete(data_svc.store(op_with_learning_parser))325        event_loop.run_until_complete(contact_svc._save(test_result))326        assert len(test_link.facts) == 1327        fact = test_link.facts[0]328        assert fact.trait == 'host.ip.address'329        assert fact.value == '10.10.10.10'330        knowledge_data = event_loop.run_until_complete(op_with_learning_parser.all_facts())331        assert len(knowledge_data) == 1332        assert knowledge_data[0].trait == 'host.ip.address'333        assert knowledge_data[0].value == '10.10.10.10'334    def test_without_learning_parser(self, event_loop, app_svc, contact_svc, data_svc, learning_svc, event_svc,335                                     op_without_learning_parser, make_test_link, make_test_result):336        test_link = make_test_link(5678)337        op_without_learning_parser.add_link(test_link)338        test_result = make_test_result(test_link.id)339        event_loop.run_until_complete(data_svc.store(op_without_learning_parser))340        event_loop.run_until_complete(contact_svc._save(test_result))341        assert len(test_link.facts) == 0342    def test_facts(self, event_loop, app_svc, contact_svc, file_svc, data_svc, learning_svc, event_svc,343                   op_with_learning_and_seeded, make_test_link, make_test_result, knowledge_svc):344        test_link = make_test_link(9876)345        op_with_learning_and_seeded.add_link(test_link)346        test_result = make_test_result(test_link.id)347        event_loop.run_until_complete(data_svc.store(op_with_learning_and_seeded))348        event_loop.run_until_complete(op_with_learning_and_seeded._init_source())  # need to call this manually (no 'run')349        event_loop.run_until_complete(contact_svc._save(test_result))350        assert len(test_link.facts) == 1351        fact = test_link.facts[0]352        assert fact.trait == 'host.ip.address'353        assert fact.value == '10.10.10.10'354        knowledge_data = event_loop.run_until_complete(op_with_learning_and_seeded.all_facts())355        assert len(knowledge_data) == 2356        origin_set = [x.source for x in knowledge_data]357        assert op_with_learning_and_seeded.id in origin_set358        assert op_with_learning_and_seeded.source.id in origin_set359        report = event_loop.run_until_complete(op_with_learning_and_seeded.report(file_svc, data_svc))360        assert len(report['facts']) == 2report.py
Source:report.py  
...57    result = Result()58    _set_result(result, steps, status, start_time, end_time)59    return result60_counter = 0  # used to create some unicity in test/suite names/descriptions61def make_test_result(name=None, description=None, steps=(), status=NotImplemented, start_time=NotImplemented, end_time=NotImplemented):62    global _counter63    test = TestResult(name or "test_%d" % _counter, description or "Test %d" % _counter)64    _counter += 165    _set_result(test, steps, status, start_time, end_time)66    return test67def make_suite_result(name=None, description=None, tests=(), sub_suites=(), setup=None, teardown=None,68                      start_time=NotImplemented, end_time=NotImplemented):69    global _counter70    suite = SuiteResult(name or "suite_%d" % _counter, description or "Suite %d" % _counter)71    _counter += 172    if setup:73        suite.suite_setup = setup74    if teardown:75        suite.suite_teardown = teardown76    for test in tests:77        suite.add_test(test)78    for sub_suite in sub_suites:79        suite.add_suite(sub_suite)80    results = list(flatten_results([suite]))81    if start_time is not NotImplemented:82        suite.start_time = start_time83    elif results:84        suite.start_time = results[0].start_time85    else:86        suite.start_time = time.time()87    if end_time is not NotImplemented:88        suite.end_time = end_time89    elif results:90        suite.end_time = results[-1].end_time91    else:92        suite.end_time = time.time()93    return suite94def make_report(suites=(), setup=None, teardown=None):95    report = Report()96    if setup:97        report.test_session_setup = setup98    for suite in suites:99        report.add_suite(suite)100    if teardown:101        report.test_session_teardown = teardown102    results = list(report.all_results())103    if results:104        report.start_time = results[0].start_time105        report.end_time = results[-1].end_time106        report.saving_time = time.time()107    else:108        report.start_time = report.end_time = report.saving_time = time.time()109    return report110###111# Some helpful fixtures112###113def make_report_in_progress():114    # create a pseudo report where all elements that can be "in-progress" (meaning without115    # an end time) are present in the report116    now = time.time()117    return make_report(118        setup=make_result(start_time=now, end_time=None),119        teardown=make_result(start_time=now, end_time=None),120        suites=[make_suite_result(121            "suite", "suite",122            start_time=now, end_time=None,123            setup=make_result(start_time=now, end_time=None),124            teardown=make_result(start_time=now, end_time=None),125            tests=[126                make_test_result(127                    "test_1", "test_1", start_time=now, end_time=None, status=None,128                    steps=[make_step("step", start_time=now, end_time=None, logs=[make_log("info", "message", ts=now)])]129                ),130                make_test_result(131                    "test_2", "test_2", start_time=now, end_time=now+1, status="passed",132                    steps=[make_step("step", start_time=now, end_time=now+1, logs=[make_log("info", "message", ts=now)])]133                )134            ]135        )]136    )137@pytest.fixture()138def report_in_progress():139    return make_report_in_progress()140@pytest.fixture()141def report_in_progress_path(tmpdir):142    backend = JsonBackend()143    report_path = os.path.join(tmpdir.strpath, "report.json")144    backend.save_report(report_path, make_report_in_progress())...vhdl_run_test_case.py
Source:vhdl_run_test_case.py  
...168    diff_command = diff_command.replace("\\","/")169    print("executing diff command: "+diff_command)170    x=os.system(diff_command)171    return [diff_file, diff_tool]172def make_test_result(child, buildFolder, inputfile, tempoutfile, referencefile,diff_file,diff_tool):173    name = child.get("name")174    entity = str(child.find("entityname").text).strip()175    ret = {}176    ret["name"] = name177    ret["entity"] =entity178    ret["descitption"] =make_description(child, buildFolder)179    ret["InputFile"] = inputfile180    ret["OutputFile"] = tempoutfile181    ret["referencefile"] = referencefile182    ret["diff_file"]= diff_file183    ret["diff_tool"]=diff_tool184    ret["IsDifferent"] = (size_of_file(diff_file) != 0)185    ret["TestType"] =  xml_find_or_defult(child,"tc_type","TestCase")186    return ret187class test_case_runner:188    def __init__(self,buildFolder = "build/", reparse = True,update_reference_file=False, verbose = False ) -> None:189        self.buildFolder = buildFolder190        self.reparse = reparse191        self.update_reference_file = update_reference_file192        self.testResults = []193        self.build_systems = list()194        self.verbose = verbose195    def run(self, FileName):196        FileName= make_rel_linux_path(FileName)197        tree = ET.parse(FileName)198        root = tree.getroot()199        split_test_case(FileName)200        filePath = os.path.dirname(FileName)201     202        print(root)203        for child in root:204            name = child.get("name")205            entity = str(child.find("entityname").text).strip()206            entity_folder = self.buildFolder+entity+"/"207            print("Start Running Test-Case: " + name + " for entity: "+entity)208            if entity not in self.build_systems:209                reparse = build_simulation(self, entity=entity ,  entity_folder=entity_folder)210                211            pre_run_script(child, entity_folder)212            [inputfile, tempoutfile, referencefile] = run_simulation(213                filePath=filePath,214                child=child,215                entity_folder=entity_folder, 216                verbose=self.verbose217            )218            post_run_script(child,entity_folder)219            [diff_file, diff_tool] =diff_output(220                child=child,221                entity_folder=entity_folder,222                tempoutfile=tempoutfile,223                referencefile=referencefile224            )225            226            if self.update_reference_file:227                copyfile(tempoutfile, referencefile)228            229            230            tc_result = make_test_result(231                child, 232                self.buildFolder, 233                inputfile, 234                tempoutfile, 235                referencefile,236                diff_file,237                diff_tool238            )239            240            self.testResults.append(tc_result)241        merge_test_case(FileName)242        return  reparse243    def generate_report(self, FileName):244        content = ""...test_report.py
Source:test_report.py  
...23    _test_timestamp_round(1485105524.353112, 1485105524.353)24def test_report_stats_simple():25    report = make_report(suites=[26        make_suite_result(tests=[27            make_test_result(steps=[make_step(logs=[make_check(True)])])28        ])29    ])30    assert_report_stats(report, expected_passed_tests=1)31def test_report_stats_failure():32    report = make_report(suites=[33        make_suite_result(tests=[34            make_test_result(steps=[make_step(logs=[make_check(True), make_log("error")])])35        ])36    ])37    assert_report_stats(report, expected_failed_tests=1)38def test_report_stats_skipped():39    report = make_report(suites=[40        make_suite_result(tests=[41            make_test_result(status="skipped")42        ])43    ])44    assert_report_stats(report, expected_skipped_tests=1)45def test_test_duration_unfinished():46    test = TstResult("test", "test")47    test.start_time = NOW48    assert test.duration is None49def test_test_duration_finished():50    test = TstResult("test", "test")51    test.start_time = NOW52    test.end_time = NOW + 1.053    assert test.duration == 1.054def test_step_duration_unfinished():55    step = Step("step")56    step.start_time = NOW57    assert step.duration is None58def test_step_duration_finished():59    step = Step("step")60    step.start_time = NOW61    step.end_time = NOW + 1.062    assert step.duration == 1.063def test_suite_duration():64    suite = make_suite_result(tests=[65        make_test_result(start_time=NOW, end_time=NOW+1),66        make_test_result(start_time=NOW+1, end_time=NOW+2)67    ])68    assert suite.duration == 269def test_suite_duration_with_setup():70    suite = make_suite_result(71        setup=make_result(start_time=NOW, end_time=NOW+1),72        tests=[make_test_result(start_time=NOW+1, end_time=NOW+2)],73    )74    assert suite.duration == 275def test_suite_duration_with_teardown():76    suite = make_suite_result(77        tests=[make_test_result(start_time=NOW+1, end_time=NOW+2)],78        teardown=make_result(start_time=NOW+2, end_time=NOW+3)79    )80    assert suite.duration == 281def test_is_successful_is_true():82    @lcc.suite("suite")83    class suite:84        @lcc.test("test")85        def test(self):86            lcc.log_info("some log")87    report = run_suite_class(suite)88    assert report.is_successful()89def test_is_successful():90    @lcc.suite("suite")91    class suite:92        @lcc.test("test")93        def test(self):94            lcc.log_error("some error")95    report = run_suite_class(suite)96    assert not report.is_successful()97def test_is_successful_with_disabled_test():98    @lcc.suite("suite")99    class suite:100        @lcc.test("test_1")101        def test_1(self):102            lcc.log_info("some log")103        @lcc.test("test_2")104        @lcc.disabled()105        def test_2(self):106            lcc.log_info("some log")107    report = run_suite_class(suite)108    assert report.is_successful()109def test_is_successful_with_failed_test():110    @lcc.suite("suite")111    class suite:112        @lcc.test("test")113        def test(self):114            lcc.log_error("some error")115    report = run_suite_class(suite)116    assert not report.is_successful()117def test_is_successful_with_failed_teardown():118    @lcc.suite("suite")119    class suite:120        @lcc.test("test")121        def test(self):122            lcc.log_info("some log")123        def teardown_suite(self):124            lcc.log_error("some error")125    report = run_suite_class(suite)126    assert not report.is_successful()127def test_check_report_message_template_ok():128    template = "{passed} test passed"129    assert check_report_message_template(template) == template130def test_check_report_message_template_ko():131    with pytest.raises(ValueError):132        assert check_report_message_template("{invalid_var}'")133@pytest.fixture()134def report_sample():135    @lcc.suite()136    class suite:137        @lcc.test()138        def test_1(self):139            pass140        @lcc.test()141        @lcc.disabled()142        def test_2(self):143            pass144        @lcc.test()145        def test_3(self):146            lcc.log_error("some issue")147        @lcc.test()148        def test_4(self):149            lcc.log_info("everything ok")150    return run_suite_class(suite)151def test_report_build_message_with_start_time(report_sample):152    assert str(CURRENT_YEAR) in report_sample.build_message("{start_time}")153def test_report_build_message_with_end_time(report_sample):154    assert str(CURRENT_YEAR) in report_sample.build_message("{end_time}")155def test_report_build_message_with_duration(report_sample):156    report_sample.end_time = report_sample.start_time + 1157    assert report_sample.build_message("{duration}") == "1s"158def test_report_build_message_with_total(report_sample):159    assert report_sample.build_message("{total}") == "4"160def test_report_build_message_with_enabled(report_sample):161    assert report_sample.build_message("{enabled}") == "3"162def test_report_build_message_with_passed(report_sample):163    assert report_sample.build_message("{passed}") == "2"164def test_report_build_message_with_passed_pct(report_sample):165    assert report_sample.build_message("{passed_pct}") == "66%"166def test_report_build_message_with_failed(report_sample):167    assert report_sample.build_message("{failed}") == "1"168def test_report_build_message_with_failed_pct(report_sample):169    assert report_sample.build_message("{failed_pct}") == "33%"170def test_report_build_message_with_skipped():171    report = make_report([172        make_suite_result(tests=[173            make_test_result(status="passed"), make_test_result(status="passed"), make_test_result(status="skipped")174        ])175    ])176    assert report.build_message("{skipped}") == "1"177def test_report_build_message_with_skipped_pct():178    report = make_report([179        make_suite_result(tests=[180            make_test_result(status="passed"), make_test_result(status="passed"), make_test_result(status="skipped")181        ])182    ])183    assert report.build_message("{skipped_pct}") == "33%"184def test_report_build_message_with_disabled(report_sample):185    assert report_sample.build_message("{disabled}") == "1"186def test_report_build_message_with_disabled_pct(report_sample):187    assert report_sample.build_message("{disabled_pct}") == "25%"...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
