Best Python code snippet using sure_python
phase_collections_test.py
Source:phase_collections_test.py  
1"""Unit tests for the phase collections library."""2import unittest3from unittest import mock4import openhtf as htf5from openhtf import plugs6from openhtf.core import base_plugs7from openhtf.core import phase_collections8from openhtf.core import phase_descriptor9from openhtf.core import phase_executor10from openhtf.core import phase_group11from openhtf.core import phase_nodes12from openhtf.core import test_record13from openhtf.util import test as htf_test14def _create_node(name):15  return htf_test.PhaseNodeNameComparable(name)16def _create_nodes(*names):17  return [_create_node(n) for n in names]18def _prefix_name(p):19  return phase_descriptor.PhaseOptions(name='prefix:' + p.name)(p)20def phase():21  pass22def fail_subtest_phase():23  return phase_descriptor.PhaseResult.FAIL_SUBTEST24class BrokenError(Exception):25  pass26def error_phase():27  raise BrokenError('broken')28def teardown_phase():29  pass30teardown_group = phase_group.PhaseGroup(teardown=teardown_phase)31@phase_descriptor.PhaseOptions()32def empty_phase():33  pass34@phase_descriptor.PhaseOptions()35def skip_phase():36  pass37@phase_descriptor.PhaseOptions()38def skip_phase0():39  pass40@phase_descriptor.PhaseOptions()41def skip_phase1():42  pass43@phase_descriptor.PhaseOptions()44def phase_with_args(arg1=None):45  del arg146class ParentPlug(base_plugs.BasePlug):47  pass48class ChildPlug(ParentPlug):49  pass50@plugs.plug(my_plug=ParentPlug.placeholder)51def plug_phase(my_plug):52  del my_plug  # Unused.53class FlattenTest(unittest.TestCase):54  def test_single_node(self):55    node = _create_node('a')56    expected = _create_nodes('a')57    self.assertEqual(expected, phase_collections.flatten(node))58  def test_iterable_flat(self):59    node1 = _create_node('1')60    node2 = _create_node('2')61    node3 = _create_node('3')62    expected = _create_nodes('1', '2', '3')63    self.assertEqual(expected, phase_collections.flatten([node1, node2, node3]))64  def test_single_phase(self):65    expected = _create_nodes('phase')66    self.assertEqual(expected, phase_collections.flatten(phase))67  def test_iterable_of_iterable(self):68    nodes = [[_create_node('1')],69             [[_create_node('2'), _create_node('3')], [_create_node('4')],70              _create_node('5')],71             _create_node('6'), phase]72    expected = _create_nodes('1', '2', '3', '4', '5', '6', 'phase')73    self.assertEqual(expected, phase_collections.flatten(nodes))74  def test_invalid_entry(self):75    nodes = 4276    with self.assertRaises(ValueError):77      phase_collections.flatten(nodes)78  def test_flatten_single_list(self):79    seq = htf.PhaseSequence(_create_nodes('1', '2'))80    expected = [htf.PhaseSequence(_create_nodes('1', '2'))]81    self.assertEqual(expected, phase_collections.flatten([seq]))82class PhaseSequenceTest(unittest.TestCase):83  def test_init__nodes_and_args(self):84    with self.assertRaises(ValueError):85      phase_collections.PhaseSequence(phase, nodes=tuple(_create_nodes('1')))86  def test_init__extra_kwargs(self):87    with self.assertRaises(ValueError):88      phase_collections.PhaseSequence(other=1)89  def test_init__single_callable(self):90    expected = phase_collections.PhaseSequence(91        nodes=tuple((phase_descriptor.PhaseDescriptor.wrap_or_copy(phase),)))92    self.assertEqual(expected, phase_collections.PhaseSequence(phase))93  def test_asdict(self):94    expected = {95        'name': 'sequence_name',96        'nodes': [{97            'name': '1'98        }, {99            'name': '2'100        }],101    }102    seq = phase_collections.PhaseSequence(103        _create_nodes('1', '2'), name='sequence_name')104    self.assertEqual(expected, seq._asdict())105  def test_with_args(self):106    mock_node = mock.create_autospec(phase_nodes.PhaseNode)107    seq = phase_collections.PhaseSequence(108        nodes=(empty_phase, phase_with_args, mock_node), name='seq')109    updated = seq.with_args(arg1=1, ignored_arg=2)110    self.assertEqual(seq.name, updated.name)111    self.assertEqual(empty_phase, updated.nodes[0])112    self.assertEqual(phase_with_args.with_args(arg1=1), updated.nodes[1])113    self.assertEqual(mock_node.with_args.return_value, updated.nodes[2])114    mock_node.with_args.assert_called_once_with(arg1=1, ignored_arg=2)115  def test_with_plugs(self):116    mock_node = mock.create_autospec(phase_nodes.PhaseNode)117    seq = phase_collections.PhaseSequence(118        nodes=(empty_phase, plug_phase, mock_node), name='seq')119    updated = seq.with_plugs(my_plug=ChildPlug, ignored_plug=ParentPlug)120    self.assertEqual(seq.name, updated.name)121    self.assertEqual(empty_phase, updated.nodes[0])122    self.assertEqual(plug_phase.with_plugs(my_plug=ChildPlug), updated.nodes[1])123    self.assertEqual(mock_node.with_plugs.return_value, updated.nodes[2])124    mock_node.with_plugs.assert_called_once_with(125        my_plug=ChildPlug, ignored_plug=ParentPlug)126  def test_load_code_info(self):127    mock_node = mock.create_autospec(phase_nodes.PhaseNode)128    seq = phase_collections.PhaseSequence(129        nodes=(empty_phase, plug_phase, mock_node), name='seq')130    updated = seq.load_code_info()131    self.assertEqual(seq.name, updated.name)132    phases = list(updated.all_phases())133    self.assertEqual(134        test_record.CodeInfo.for_function(empty_phase.func),135        phases[0].code_info)136    self.assertEqual(137        test_record.CodeInfo.for_function(plug_phase.func), phases[1].code_info)138    self.assertEqual(mock_node.load_code_info.return_value, updated.nodes[2])139    mock_node.load_code_info.assert_called_once_with()140  def test_apply_to_all_phases(self):141    mock_node = mock.create_autospec(phase_nodes.PhaseNode)142    seq = phase_collections.PhaseSequence(143        nodes=(empty_phase, plug_phase, mock_node), name='seq')144    updated = seq.apply_to_all_phases(_prefix_name)145    self.assertEqual(seq.name, updated.name)146    self.assertEqual(_prefix_name(empty_phase), updated.nodes[0])147    self.assertEqual(_prefix_name(plug_phase), updated.nodes[1])148    self.assertEqual(mock_node.apply_to_all_phases.return_value,149                     updated.nodes[2])150    mock_node.apply_to_all_phases.assert_called_once_with(_prefix_name)151  def test_all_phases(self):152    mock_node = mock.create_autospec(phase_nodes.PhaseNode)153    seq = phase_collections.PhaseSequence(154        nodes=(empty_phase, plug_phase, mock_node), name='seq')155    self.assertEqual([empty_phase, plug_phase], list(seq.all_phases()))156class PhaseSequenceIntegrationTest(htf_test.TestCase):157  @htf_test.yields_phases158  def test_nested(self):159    seq = phase_collections.PhaseSequence(160        phase_collections.PhaseSequence(phase, empty_phase))161    test_rec = yield htf.Test(seq)162    self.assertTestPass(test_rec)163    self.assertPhasesOutcomeByName(test_record.PhaseOutcome.PASS, test_rec,164                                   'phase', 'empty_phase')165class SubtestTest(unittest.TestCase):166  def test_init__name(self):167    subtest = phase_collections.Subtest('subtest', phase)168    self.assertEqual('subtest', subtest.name)169  def test_check_duplicates__dupes(self):170    seq = phase_collections.PhaseSequence(171        nodes=(phase_collections.Subtest('dupe'),172               phase_collections.Subtest('dupe')))173    with self.assertRaises(phase_collections.DuplicateSubtestNamesError):174      phase_collections.check_for_duplicate_subtest_names(seq)175  def test_check_duplicates__nested_dupes(self):176    seq = phase_collections.PhaseSequence(177        nodes=(phase_collections.Subtest(178            'dupe', nodes=(phase_collections.Subtest('dupe'),)),))179    with self.assertRaises(phase_collections.DuplicateSubtestNamesError):180      phase_collections.check_for_duplicate_subtest_names(seq)181class SubtestIntegrationTest(htf_test.TestCase):182  @htf_test.yields_phases183  def test_pass(self):184    subtest = phase_collections.Subtest('subtest', phase)185    test_rec = yield htf.Test(subtest)186    self.assertPhasesOutcomeByName(test_record.PhaseOutcome.PASS, test_rec,187                                   'phase')188    self.assertTestPass(test_rec)189    self.assertEqual([190        test_record.SubtestRecord(191            name='subtest',192            start_time_millis=htf_test.VALID_TIMESTAMP,193            end_time_millis=htf_test.VALID_TIMESTAMP,194            outcome=test_record.SubtestOutcome.PASS),195    ], test_rec.subtests)196    self.assertEqual('subtest', test_rec.phases[-1].subtest_name)197  @htf_test.yields_phases198  def test_fail_but_still_continues(self):199    subtest = phase_collections.Subtest('failure', fail_subtest_phase,200                                        skip_phase)201    test_rec = yield htf.Test(subtest, phase)202    self.assertTestFail(test_rec)203    fail_phase_rec = test_rec.phases[1]204    self.assertPhaseOutcomeFail(fail_phase_rec)205    self.assertPhaseFailSubtest(fail_phase_rec)206    self.assertEqual('failure', fail_phase_rec.subtest_name)207    skip_phase_rec = test_rec.phases[2]208    self.assertPhaseOutcomeSkip(skip_phase_rec)209    self.assertPhaseSkip(skip_phase_rec)210    self.assertEqual('failure', skip_phase_rec.subtest_name)211    continue_phase_rec = test_rec.phases[3]212    self.assertPhaseOutcomePass(continue_phase_rec)213    self.assertPhaseContinue(continue_phase_rec)214    self.assertIsNone(continue_phase_rec.subtest_name)215    self.assertEqual([216        test_record.SubtestRecord(217            name='failure',218            start_time_millis=htf_test.VALID_TIMESTAMP,219            end_time_millis=htf_test.VALID_TIMESTAMP,220            outcome=test_record.SubtestOutcome.FAIL),221    ], test_rec.subtests)222  @htf_test.yields_phases223  def test_error(self):224    subtest = phase_collections.Subtest('subtest', error_phase)225    test_rec = yield htf.Test(subtest, phase)226    self.assertTestError(test_rec)227    self.assertPhasesOutcomeByName(test_record.PhaseOutcome.PASS, test_rec,228                                   'phase')229    self.assertPhasesOutcomeByName(test_record.PhaseOutcome.ERROR, test_rec,230                                   'error_phase')231    self.assertPhasesNotRun(test_rec, 'phase')232    error_phase_rec = test_rec.phases[1]233    self.assertPhaseError(error_phase_rec, exc_type=BrokenError)234    self.assertEqual([235        test_record.SubtestRecord(236            name='subtest',237            start_time_millis=htf_test.VALID_TIMESTAMP,238            end_time_millis=htf_test.VALID_TIMESTAMP,239            outcome=test_record.SubtestOutcome.STOP),240    ], test_rec.subtests)241  @htf_test.yields_phases242  def test_pass__with_group(self):243    subtest = phase_collections.Subtest('subtest', teardown_group.wrap(phase))244    test_rec = yield htf.Test(subtest)245    self.assertTestPass(test_rec)246    self.assertPhasesOutcomeByName(test_record.PhaseOutcome.PASS, test_rec,247                                   'phase', 'teardown_phase')248    self.assertEqual([249        test_record.SubtestRecord(250            name='subtest',251            start_time_millis=htf_test.VALID_TIMESTAMP,252            end_time_millis=htf_test.VALID_TIMESTAMP,253            outcome=test_record.SubtestOutcome.PASS),254    ], test_rec.subtests)255  @htf_test.yields_phases256  def test_fail__with_group(self):257    subtest = phase_collections.Subtest('it_fails',258                                        teardown_group.wrap(fail_subtest_phase),259                                        skip_phase)260    test_rec = yield htf.Test(subtest, phase)261    self.assertTestFail(test_rec)262    fail_phase_rec = test_rec.phases[1]263    self.assertEqual('fail_subtest_phase', fail_phase_rec.name)264    self.assertPhaseOutcomeFail(fail_phase_rec)265    self.assertPhaseFailSubtest(fail_phase_rec)266    self.assertEqual('it_fails', fail_phase_rec.subtest_name)267    teardown_phase_rec = test_rec.phases[2]268    self.assertEqual('teardown_phase', teardown_phase_rec.name)269    self.assertPhaseContinue(teardown_phase_rec)270    self.assertPhaseOutcomePass(teardown_phase_rec)271    self.assertEqual('it_fails', teardown_phase_rec.subtest_name)272    skip_phase_rec = test_rec.phases[3]273    self.assertEqual('skip_phase', skip_phase_rec.name)274    self.assertPhaseSkip(skip_phase_rec)275    self.assertPhaseOutcomeSkip(skip_phase_rec)276    self.assertEqual('it_fails', skip_phase_rec.subtest_name)277    continue_phase_rec = test_rec.phases[4]278    self.assertEqual('phase', continue_phase_rec.name)279    self.assertPhaseOutcomePass(continue_phase_rec)280    self.assertPhaseContinue(continue_phase_rec)281    self.assertIsNone((continue_phase_rec.subtest_name))282    self.assertEqual([283        test_record.SubtestRecord(284            name='it_fails',285            start_time_millis=htf_test.VALID_TIMESTAMP,286            end_time_millis=htf_test.VALID_TIMESTAMP,287            outcome=test_record.SubtestOutcome.FAIL),288    ], test_rec.subtests)289  @htf_test.yields_phases290  def test_fail__with_nested_group_skipped(self):291    subtest = phase_collections.Subtest(292        'it_fails', fail_subtest_phase,293        htf.PhaseGroup(main=[skip_phase0], teardown=[skip_phase1]), skip_phase)294    test_rec = yield htf.Test(subtest, phase)295    self.assertTestFail(test_rec)296    fail_phase_rec = test_rec.phases[1]297    self.assertEqual('fail_subtest_phase', fail_phase_rec.name)298    self.assertPhaseOutcomeFail(fail_phase_rec)299    self.assertPhaseFailSubtest(fail_phase_rec)300    self.assertEqual('it_fails', fail_phase_rec.subtest_name)301    skip_phase0_rec = test_rec.phases[2]302    self.assertEqual('skip_phase0', skip_phase0_rec.name)303    self.assertPhaseSkip(skip_phase0_rec)304    self.assertPhaseOutcomeSkip(skip_phase0_rec)305    self.assertEqual('it_fails', skip_phase0_rec.subtest_name)306    skip_phase1_rec = test_rec.phases[3]307    self.assertEqual('skip_phase1', skip_phase1_rec.name)308    self.assertPhaseSkip(skip_phase1_rec)309    self.assertPhaseOutcomeSkip(skip_phase1_rec)310    self.assertEqual('it_fails', skip_phase1_rec.subtest_name)311    skip_phase_rec = test_rec.phases[4]312    self.assertEqual('skip_phase', skip_phase_rec.name)313    self.assertPhaseSkip(skip_phase_rec)314    self.assertPhaseOutcomeSkip(skip_phase_rec)315    self.assertEqual('it_fails', skip_phase_rec.subtest_name)316    continue_phase_rec = test_rec.phases[5]317    self.assertEqual('phase', continue_phase_rec.name)318    self.assertPhaseOutcomePass(continue_phase_rec)319    self.assertPhaseContinue(continue_phase_rec)320    self.assertIsNone((continue_phase_rec.subtest_name))321    self.assertEqual([322        test_record.SubtestRecord(323            name='it_fails',324            start_time_millis=htf_test.VALID_TIMESTAMP,325            end_time_millis=htf_test.VALID_TIMESTAMP,326            outcome=test_record.SubtestOutcome.FAIL),327    ], test_rec.subtests)328  @htf_test.yields_phases329  def test_fail__with_nested_group_fail_in_setup(self):330    subtest = phase_collections.Subtest(331        'it_fails',332        htf.PhaseGroup(333            setup=[fail_subtest_phase],334            main=[skip_phase0],335            teardown=[skip_phase1]), skip_phase)336    test_rec = yield htf.Test(subtest, phase)337    self.assertTestFail(test_rec)338    fail_phase_rec = test_rec.phases[1]339    self.assertEqual('fail_subtest_phase', fail_phase_rec.name)340    self.assertPhaseOutcomeFail(fail_phase_rec)341    self.assertPhaseFailSubtest(fail_phase_rec)342    self.assertEqual('it_fails', fail_phase_rec.subtest_name)343    skip_phase0_rec = test_rec.phases[2]344    self.assertEqual('skip_phase0', skip_phase0_rec.name)345    self.assertPhaseSkip(skip_phase0_rec)346    self.assertPhaseOutcomeSkip(skip_phase0_rec)347    self.assertEqual('it_fails', skip_phase0_rec.subtest_name)348    skip_phase1_rec = test_rec.phases[3]349    self.assertEqual('skip_phase1', skip_phase1_rec.name)350    self.assertPhaseSkip(skip_phase1_rec)351    self.assertPhaseOutcomeSkip(skip_phase1_rec)352    self.assertEqual('it_fails', skip_phase1_rec.subtest_name)353    skip_phase_rec = test_rec.phases[4]354    self.assertEqual('skip_phase', skip_phase_rec.name)355    self.assertPhaseSkip(skip_phase_rec)356    self.assertPhaseOutcomeSkip(skip_phase_rec)357    self.assertEqual('it_fails', skip_phase_rec.subtest_name)358    continue_phase_rec = test_rec.phases[5]359    self.assertEqual('phase', continue_phase_rec.name)360    self.assertPhaseOutcomePass(continue_phase_rec)361    self.assertPhaseContinue(continue_phase_rec)362    self.assertIsNone((continue_phase_rec.subtest_name))363    self.assertEqual([364        test_record.SubtestRecord(365            name='it_fails',366            start_time_millis=htf_test.VALID_TIMESTAMP,367            end_time_millis=htf_test.VALID_TIMESTAMP,368            outcome=test_record.SubtestOutcome.FAIL),369    ], test_rec.subtests)370  @htf_test.yields_phases371  def test_fail__with_nested_group_fail_in_teardown(self):372    subtest = phase_collections.Subtest(373        'it_fails',374        htf.PhaseGroup(375            main=[empty_phase], teardown=[fail_subtest_phase, teardown_phase]),376        skip_phase)377    test_rec = yield htf.Test(subtest, phase)378    self.assertTestFail(test_rec)379    empty_phase_rec = test_rec.phases[1]380    self.assertEqual('empty_phase', empty_phase_rec.name)381    self.assertPhaseOutcomePass(empty_phase_rec)382    self.assertPhaseContinue(empty_phase_rec)383    self.assertEqual('it_fails', empty_phase_rec.subtest_name)384    fail_phase_rec = test_rec.phases[2]385    self.assertEqual('fail_subtest_phase', fail_phase_rec.name)386    self.assertPhaseOutcomeFail(fail_phase_rec)387    self.assertPhaseFailSubtest(fail_phase_rec)388    self.assertEqual('it_fails', fail_phase_rec.subtest_name)389    teardown_phase_rec = test_rec.phases[3]390    self.assertEqual('teardown_phase', teardown_phase_rec.name)391    self.assertPhaseContinue(teardown_phase_rec)392    self.assertPhaseOutcomePass(teardown_phase_rec)393    self.assertEqual('it_fails', teardown_phase_rec.subtest_name)394    skip_phase_rec = test_rec.phases[4]395    self.assertEqual('skip_phase', skip_phase_rec.name)396    self.assertPhaseSkip(skip_phase_rec)397    self.assertPhaseOutcomeSkip(skip_phase_rec)398    self.assertEqual('it_fails', skip_phase_rec.subtest_name)399    continue_phase_rec = test_rec.phases[5]400    self.assertEqual('phase', continue_phase_rec.name)401    self.assertPhaseOutcomePass(continue_phase_rec)402    self.assertPhaseContinue(continue_phase_rec)403    self.assertIsNone((continue_phase_rec.subtest_name))404    self.assertEqual([405        test_record.SubtestRecord(406            name='it_fails',407            start_time_millis=htf_test.VALID_TIMESTAMP,408            end_time_millis=htf_test.VALID_TIMESTAMP,409            outcome=test_record.SubtestOutcome.FAIL),410    ], test_rec.subtests)411  @htf_test.yields_phases412  def test_error__with_group(self):413    subtest = phase_collections.Subtest('it_errors',414                                        teardown_group.wrap(error_phase))415    test_rec = yield htf.Test(subtest, phase)416    self.assertTestError(test_rec)417    error_phase_rec = test_rec.phases[1]418    self.assertEqual('error_phase', error_phase_rec.name)419    self.assertPhaseOutcomeError(error_phase_rec)420    self.assertPhaseError(error_phase_rec, exc_type=BrokenError)421    self.assertEqual('it_errors', error_phase_rec.subtest_name)422    teardown_phase_rec = test_rec.phases[2]423    self.assertEqual('teardown_phase', teardown_phase_rec.name)424    self.assertPhaseContinue(teardown_phase_rec)425    self.assertPhaseOutcomePass(teardown_phase_rec)426    self.assertEqual('it_errors', teardown_phase_rec.subtest_name)427    self.assertEqual([428        test_record.SubtestRecord(429            name='it_errors',430            start_time_millis=htf_test.VALID_TIMESTAMP,431            end_time_millis=htf_test.VALID_TIMESTAMP,432            outcome=test_record.SubtestOutcome.STOP),433    ], test_rec.subtests)434  @htf_test.yields_phases435  def test_nested__pass(self):436    subtest = phase_collections.Subtest(437        'outer', phase, phase_collections.Subtest('inner', phase))438    test_rec = yield htf.Test(subtest)439    self.assertTestPass(test_rec)440    outer_phase_rec = test_rec.phases[1]441    self.assertEqual('outer', outer_phase_rec.subtest_name)442    inner_phase_rec = test_rec.phases[2]443    self.assertEqual('inner', inner_phase_rec.subtest_name)444    self.assertEqual([445        test_record.SubtestRecord(446            name='inner',447            start_time_millis=htf_test.VALID_TIMESTAMP,448            end_time_millis=htf_test.VALID_TIMESTAMP,449            outcome=test_record.SubtestOutcome.PASS),450        test_record.SubtestRecord(451            name='outer',452            start_time_millis=htf_test.VALID_TIMESTAMP,453            end_time_millis=htf_test.VALID_TIMESTAMP,454            outcome=test_record.SubtestOutcome.PASS),455    ], test_rec.subtests)456  @htf_test.yields_phases457  def test_nested__fail(self):458    subtest = phase_collections.Subtest(459        'outer', phase,460        phase_collections.Subtest('inner', fail_subtest_phase, skip_phase),461        empty_phase)462    test_rec = yield htf.Test(subtest)463    self.assertTestFail(test_rec)464    outer_phase_rec = test_rec.phases[1]465    self.assertEqual('phase', outer_phase_rec.name)466    self.assertEqual('outer', outer_phase_rec.subtest_name)467    self.assertPhaseOutcomePass(outer_phase_rec)468    inner_phase_rec = test_rec.phases[2]469    self.assertEqual('fail_subtest_phase', inner_phase_rec.name)470    self.assertEqual('inner', inner_phase_rec.subtest_name)471    self.assertPhaseOutcomeFail(inner_phase_rec)472    skip_phase_rec = test_rec.phases[3]473    self.assertEqual('skip_phase', skip_phase_rec.name)474    self.assertEqual('inner', skip_phase_rec.subtest_name)475    self.assertPhaseOutcomeSkip(skip_phase_rec)476    outer_phase2_rec = test_rec.phases[4]477    self.assertEqual('empty_phase', outer_phase2_rec.name)478    self.assertEqual('outer', outer_phase2_rec.subtest_name)479    self.assertPhaseOutcomePass(outer_phase2_rec)480    self.assertEqual([481        test_record.SubtestRecord(482            name='inner',483            start_time_millis=htf_test.VALID_TIMESTAMP,484            end_time_millis=htf_test.VALID_TIMESTAMP,485            outcome=test_record.SubtestOutcome.FAIL),486        test_record.SubtestRecord(487            name='outer',488            start_time_millis=htf_test.VALID_TIMESTAMP,489            end_time_millis=htf_test.VALID_TIMESTAMP,490            outcome=test_record.SubtestOutcome.PASS),491    ], test_rec.subtests)492  @htf_test.yields_phases493  def test_fail_subtest__not_in_subtest(self):494    test_rec = yield htf.Test(fail_subtest_phase, phase)495    self.assertTestError(496        test_rec, exc_type=phase_executor.InvalidPhaseResultError)497    fail_phase_rec = test_rec.phases[1]498    self.assertPhaseError(499        fail_phase_rec, exc_type=phase_executor.InvalidPhaseResultError)500    self.assertPhaseOutcomeError(fail_phase_rec)501    self.assertIsNone(fail_phase_rec.subtest_name)502  @htf_test.yields_phases503  def test_fail_subtest__nested_subtest_also_skipped(self):504    subtest = phase_collections.Subtest(505        'outer', fail_subtest_phase, skip_phase0,506        phase_collections.Subtest('inner', skip_phase), skip_phase1)507    test_rec = yield htf.Test(subtest, phase)508    self.assertPhasesOutcomeByName(test_record.PhaseOutcome.FAIL, test_rec,509                                   'fail_subtest_phase')510    self.assertPhasesOutcomeByName(test_record.PhaseOutcome.SKIP, test_rec,511                                   'skip_phase0', 'skip_phase', 'skip_phase1')512    self.assertPhasesOutcomeByName(test_record.PhaseOutcome.PASS, test_rec,513                                   'phase')514    self.assertEqual([515        test_record.SubtestRecord(516            name='inner',517            start_time_millis=htf_test.VALID_TIMESTAMP,518            end_time_millis=htf_test.VALID_TIMESTAMP,519            outcome=test_record.SubtestOutcome.FAIL),520        test_record.SubtestRecord(521            name='outer',522            start_time_millis=htf_test.VALID_TIMESTAMP,523            end_time_millis=htf_test.VALID_TIMESTAMP,524            outcome=test_record.SubtestOutcome.FAIL),525    ], test_rec.subtests)526  @htf_test.yields_phases527  def test_fail_subtest__skip_checkpoint(self):528    subtest = phase_collections.Subtest(529        'skip_checkpoint', fail_subtest_phase,530        htf.PhaseFailureCheckpoint('must_be_skipped'), skip_phase)531    test_rec = yield htf.Test(subtest, phase)532    self.assertTestFail(test_rec)533    fail_phase_rec = test_rec.phases[1]534    self.assertPhaseFailSubtest(fail_phase_rec)535    self.assertPhaseOutcomeFail(fail_phase_rec)536    skip_phase_rec = test_rec.phases[2]537    self.assertPhaseOutcomeSkip(skip_phase_rec)538    continue_phase_rec = test_rec.phases[3]539    self.assertPhaseOutcomePass(continue_phase_rec)540    self.assertTrue(test_rec.checkpoints[0].result.is_skip)541  @htf_test.yields_phases542  def test_fail_subtest__skip_branch_that_would_not_run(self):543    class _Diag(htf.DiagResultEnum):544      NOT_SET = 'not_set'545    subtest = phase_collections.Subtest(546        'skip_branch', fail_subtest_phase,547        htf.BranchSequence(_Diag.NOT_SET, error_phase), skip_phase)548    test_rec = yield htf.Test(subtest, phase)549    self.assertTestFail(test_rec)550    self.assertPhasesOutcomeByName(test_record.PhaseOutcome.FAIL, test_rec,551                                   'fail_subtest_phase')552    self.assertPhasesOutcomeByName(test_record.PhaseOutcome.SKIP, test_rec,553                                   'skip_phase')554    self.assertPhasesOutcomeByName(test_record.PhaseOutcome.PASS, test_rec,555                                   'phase')556    self.assertPhasesNotRun(test_rec, 'error_phase')557  @htf_test.yields_phases558  def test_fail_subtest__skip_branch_that_would_run(self):559    class _Diag(htf.DiagResultEnum):560      SET = 'set'561    @htf.PhaseDiagnoser(_Diag)562    def diagnoser(phase_rec):563      del phase_rec  # Unused.564      return htf.Diagnosis(_Diag.SET)565    @htf.diagnose(diagnoser)566    def diag_phase():567      pass568    subtest = phase_collections.Subtest(569        'skip_branch', fail_subtest_phase,570        htf.BranchSequence(571            htf.DiagnosisCondition.on_all(_Diag.SET), error_phase), skip_phase)572    test_rec = yield htf.Test(diag_phase, subtest, phase)573    self.assertTestFail(test_rec)574    self.assertPhasesOutcomeByName(test_record.PhaseOutcome.FAIL, test_rec,575                                   'fail_subtest_phase')576    self.assertPhasesOutcomeByName(test_record.PhaseOutcome.SKIP, test_rec,577                                   'skip_phase')578    self.assertPhasesOutcomeByName(test_record.PhaseOutcome.PASS, test_rec,579                                   'diag_phase', 'phase')...test_py_cast.py
Source:test_py_cast.py  
1"""2Tests related to casting and vector/array composition.3"""4import ctypes5import pyshader6from pyshader import f32, f64, u8, i16, i32, i64  # noqa7from pyshader import bvec2, ivec2, ivec3, vec2, vec3, vec4, Array  # noqa8import wgpu.backends.rs  # noqa9from wgpu.utils import compute_with_buffers10import pytest11from testutils import can_use_wgpu_lib, iters_equal12from testutils import validate_module, run_test_and_print_new_hashes13def test_cast_i32_f32():14    @python2shader_and_validate15    def compute_shader(16        index: ("input", "GlobalInvocationId", "ivec3"),17        data1: ("buffer", 0, "Array(i32)"),18        data2: ("buffer", 1, "Array(f32)"),19    ):20        i = index.x21        data2[i] = f32(data1[i])22    skip_if_no_wgpu()23    values1 = [-999999, -100, -4, 0, 4, 100, 32767, 32768, 999999]24    inp_arrays = {0: (ctypes.c_int32 * len(values1))(*values1)}25    out_arrays = {1: ctypes.c_float * len(values1)}26    out = compute_with_buffers(inp_arrays, out_arrays, compute_shader)27    assert iters_equal(out[1], values1)28def test_cast_u8_f32():29    @python2shader_and_validate30    def compute_shader(31        index: ("input", "GlobalInvocationId", ivec3),32        data1: ("buffer", 0, Array(u8)),33        data2: ("buffer", 1, Array(f32)),34    ):35        i = index.x36        data2[i] = f32(data1[i])37    skip_if_no_wgpu()38    values1 = [0, 1, 4, 127, 128, 255]39    inp_arrays = {0: (ctypes.c_ubyte * len(values1))(*values1)}40    out_arrays = {1: ctypes.c_float * len(values1)}41    out = compute_with_buffers(inp_arrays, out_arrays, compute_shader)42    assert iters_equal(out[1], values1)43def test_cast_f32_i32():44    @python2shader_and_validate45    def compute_shader(46        index: ("input", "GlobalInvocationId", ivec3),47        data1: ("buffer", 0, Array(f32)),48        data2: ("buffer", 1, Array(i32)),49    ):50        i = index.x51        data2[i] = i32(data1[i])52    skip_if_no_wgpu()53    inp_arrays = {0: (ctypes.c_float * 20)(*range(20))}54    out_arrays = {1: ctypes.c_int32 * 20}55    out = compute_with_buffers(inp_arrays, out_arrays, compute_shader)56    assert iters_equal(out[1], range(20))57def test_cast_f32_f32():58    @python2shader_and_validate59    def compute_shader(60        index: ("input", "GlobalInvocationId", ivec3),61        data1: ("buffer", 0, Array(f32)),62        data2: ("buffer", 1, Array(f32)),63    ):64        i = index.x65        data2[i] = f32(data1[i])66    skip_if_no_wgpu()67    inp_arrays = {0: (ctypes.c_float * 20)(*range(20))}68    out_arrays = {1: ctypes.c_float * 20}69    out = compute_with_buffers(inp_arrays, out_arrays, compute_shader)70    assert iters_equal(out[1], range(20))71def test_cast_f32_f64():72    @python2shader_and_validate73    def compute_shader(74        index: ("input", "GlobalInvocationId", ivec3),75        data1: ("buffer", 0, Array(f32)),76        data2: ("buffer", 1, Array(f64)),77    ):78        i = index.x79        data2[i] = f64(data1[i])80    skip_if_no_wgpu()81    inp_arrays = {0: (ctypes.c_float * 20)(*range(20))}82    out_arrays = {1: ctypes.c_double * 20}83    out = compute_with_buffers(inp_arrays, out_arrays, compute_shader)84    assert iters_equal(out[1], range(20))85def test_cast_i64_i16():86    @python2shader_and_validate87    def compute_shader(88        index: ("input", "GlobalInvocationId", ivec3),89        data1: ("buffer", 0, Array(i64)),90        data2: ("buffer", 1, Array(i16)),91    ):92        i = index.x93        data2[i] = i16(data1[i])94    skip_if_no_wgpu()95    values1 = [-999999, -100, -4, 0, 4, 100, 32767, 32768, 999999]96    values2 = [-16959, -100, -4, 0, 4, 100, 32767, -32768, 16959]97    inp_arrays = {0: (ctypes.c_longlong * len(values1))(*values1)}98    out_arrays = {1: ctypes.c_short * len(values1)}99    out = compute_with_buffers(inp_arrays, out_arrays, compute_shader)100    assert iters_equal(out[1], values2)101def test_cast_i16_u8():102    @python2shader_and_validate103    def compute_shader(104        index: ("input", "GlobalInvocationId", ivec3),105        data1: ("buffer", 0, Array(i16)),106        data2: ("buffer", 1, Array(u8)),107    ):108        i = index.x109        data2[i] = u8(data1[i])110    skip_if_no_wgpu()111    values1 = [-3, -2, -1, 0, 1, 2, 3, 127, 128, 255, 256, 300]112    values2 = [253, 254, 255, 0, 1, 2, 3, 127, 128, 255, 0, 44]113    inp_arrays = {0: (ctypes.c_short * len(values1))(*values1)}114    out_arrays = {1: ctypes.c_ubyte * len(values1)}115    out = compute_with_buffers(inp_arrays, out_arrays, compute_shader)116    assert iters_equal(out[1], values2)117def test_cast_vec_ivec2_vec2():118    # This triggers the direct number-vector conversion119    @python2shader_and_validate120    def compute_shader(121        index: ("input", "GlobalInvocationId", ivec3),122        data1: ("buffer", 0, Array(ivec2)),123        data2: ("buffer", 1, Array(vec2)),124    ):125        i = index.x126        data2[i] = vec2(data1[i])127    skip_if_no_wgpu()128    values1 = [-999999, -100, -4, 1, 4, 100, 32767, 32760, 0, 999999]129    inp_arrays = {0: (ctypes.c_int32 * len(values1))(*values1)}130    out_arrays = {1: ctypes.c_float * len(values1)}131    out = compute_with_buffers(inp_arrays, out_arrays, compute_shader, n=5)132    assert iters_equal(out[1], values1)133def test_cast_vec_any_vec4():134    # Look how all args in a vector are converted :)135    @python2shader_and_validate136    def compute_shader(137        index: ("input", "GlobalInvocationId", ivec3),138        data2: ("buffer", 1, Array(vec4)),139    ):140        data2[index.x] = vec4(7.0, 3, ivec2(False, 2.7))141    skip_if_no_wgpu()142    values2 = [7.0, 3.0, 0.0, 2.0] * 2143    inp_arrays = {}144    out_arrays = {1: ctypes.c_float * len(values2)}145    out = compute_with_buffers(inp_arrays, out_arrays, compute_shader, n=2)146    assert iters_equal(out[1], values2)147def test_cast_vec_ivec3_vec3():148    return  # raise pytest.skip(msg="Cannot do vec3 storage buffers")149    # Exception: SpirV invalid:150    # error: line 23: Structure id 10 decorated as BufferBlock for151    # variable in Uniform storage class must follow standard storage152    # buffer layout rules: member 0 contains an array with stride 12153    # not satisfying alignment to 16154    @python2shader_and_validate155    def compute_shader(156        index: ("input", "GlobalInvocationId", ivec3),157        data1: ("buffer", 0, Array(ivec3)),158        data2: ("buffer", 1, Array(vec3)),159    ):160        i = index.x161        data2[i] = vec3(data1[i])162    skip_if_no_wgpu()163    # vec3's are padded to 16 bytes! I guess it's a "feature"164    # https://stackoverflow.com/questions/38172696165    # ... so now I updated my driver and then it works ... sigh166    values1 = [-999999, -100, -4, 1, 4, 100, 32767, 32760, 999999]167    values2 = [-999999, -100, -4, 0, 4, 100, 32767, 0, 999999]168    inp_arrays = {0: (ctypes.c_int32 * len(values1))(*values1)}169    out_arrays = {1: ctypes.c_float * len(values1)}170    out = compute_with_buffers(inp_arrays, out_arrays, compute_shader, n=3)171    it_works = iters_equal(out[1], values1)172    it_fails = iters_equal(out[1], values2)173    assert it_works or it_fails  # ah well ...174def test_cast_ivec2_bvec2():175    # This triggers the per-element vector conversion176    @python2shader_and_validate177    def compute_shader(178        index: ("input", "GlobalInvocationId", ivec3),179        data1: ("buffer", 0, Array(ivec2)),180        data2: ("buffer", 1, Array(ivec2)),181    ):182        i = index.x183        tmp = bvec2(data1[i])184        data2[i] = ivec2(tmp)  # ext visible storage cannot be bool185    skip_if_no_wgpu()186    values1 = [-999999, -100, 0, 1, 4, 100, 32767, 32760, 0, 999999]187    values2 = [True, True, False, True, True, True, True, True, False, True]188    inp_arrays = {0: (ctypes.c_int32 * len(values1))(*values1)}189    out_arrays = {1: ctypes.c_int32 * len(values1)}190    out = compute_with_buffers(inp_arrays, out_arrays, compute_shader, n=5)191    assert iters_equal(out[1], values2)192def test_abstract_types():193    # This triggers the per-element vector conversion194    @python2shader_and_validate195    def compute_shader(196        index: ("input", "GlobalInvocationId", ivec3),197        data1: ("buffer", 0, Array(ivec2)),198        data2: ("buffer", 1, Array(vec2)),199    ):200        i = index.x201        a = data1[i]202        data2[i] = Vector(2, f32)(a)203    skip_if_no_wgpu()204    values1 = [-999999, -100, 0, 1, 4, 100, 32767, 32760, 0, 999999]205    inp_arrays = {0: (ctypes.c_int32 * len(values1))(*values1)}206    out_arrays = {1: ctypes.c_float * len(values1)}207    out = compute_with_buffers(inp_arrays, out_arrays, compute_shader, n=5)208    assert iters_equal(out[1], values1)209# %% Utils for this module210def python2shader_and_validate(func):211    m = pyshader.python2shader(func)212    assert m.input is func213    validate_module(m, HASHES)214    return m215def skip_if_no_wgpu():216    if not can_use_wgpu_lib:217        raise pytest.skip(msg="SpirV validated, but not run (cannot use wgpu)")218HASHES = {219    "test_cast_i32_f32.compute_shader": ("0f97715bd46e44f1", "299f3e15acad5894"),220    "test_cast_u8_f32.compute_shader": ("db621633396816ab", "54681989e3ffef4a"),221    "test_cast_f32_i32.compute_shader": ("9e5240879e527c42", "af778e5bb20d0a2c"),222    "test_cast_f32_f32.compute_shader": ("3e9af418d6a59b2f", "aff51f11276e2d60"),223    "test_cast_f32_f64.compute_shader": ("ffdd61ca192a6af2", "f47fd4c37fd8d306"),224    "test_cast_i64_i16.compute_shader": ("820ac5faf4b9ef5c", "88e02595146b6c8f"),225    "test_cast_i16_u8.compute_shader": ("5160ad257f473715", "945fe0f09c981c3b"),226    "test_cast_vec_ivec2_vec2.compute_shader": ("faaf97ec5191ff47", "c2c40130d690c6e2"),227    "test_cast_vec_any_vec4.compute_shader": ("663a0a466eefd578", "222df0015243ae37"),228    "test_cast_ivec2_bvec2.compute_shader": ("f97e8c25daf81ba2", "87360c6b52fbdb65"),229    "test_abstract_types.compute_shader": ("4573e2bdbc07a59a", "bd6ee2724ca16939"),230}231if __name__ == "__main__":...hyd.py
Source:hyd.py  
1#!/usr/bin/env python2"""The script for executing HydPy workflows.3.. _`Python Launcher for Windows`: \4https://docs.python.org/3/using/windows.html#launcher5.. _`here`: https://bitbucket.org/vinay.sajip/pylauncher/downloads6This script is thought to be called from a command line.  After7successful installation of HydPy, you should be able to invoke it from8anywhere on your computer.  You can test this by just typing `hyd.py` into9your command line:10>>> import subprocess11>>> from hydpy import run_subprocess12>>> result = run_subprocess("hyd.py")    # doctest: +ELLIPSIS13Invoking hyd.py without arguments resulted in the following error:14The first positional argument defining the function to be called is missing.15<BLANKLINE>16See the following stack traceback for debugging:17...18If this test example does not work on your machine, you should first make sure19there is a `hyd.py` file in the `Scripts` folder of your Python distribution,20and that the environment variable `Path` is pointing to this folder.21Windows users should also make sure to have the `Python Launcher for Windows`_22installed.  The Python standard distribution contains this launcher, but23other distributions like Anaconda do not.  You can find the suitable24installer `here`_.  As a stopgap, you could directly call Python and pass25the complete path of the `hyd.py` file available in your *HydPy* site-packages26folder as an argument:27>>> import sys28>>> from hydpy.exe import hyd29>>> command = f"{sys.executable} {hyd.__file__}"30>>> from hydpy import repr_31>>> repr_(command)    # doctest: +ELLIPSIS32'...python... .../hydpy/exe/hyd.py'33>>> result = run_subprocess(command)    # doctest: +ELLIPSIS34Invoking hyd.py without arguments resulted in the following error:35The first positional argument defining the function to be called is missing.36...37You are free to redirect output to a log file:38>>> from hydpy import TestIO39>>> TestIO.clear()40>>> with TestIO():41...     result = run_subprocess("hyd.py logfile=my_log_file.txt")42>>> with TestIO():43...     with open("my_log_file.txt") as logfile:44...         print(logfile.read())    # doctest: +ELLIPSIS45Invoking hyd.py with argument `logfile=my_log_file.txt` resulted in the \46following error:47The first positional argument defining the function to be called is missing.48...49When passing `default` as keyword argument `logfile`, function50|prepare_logfile| generates a default name containing the current51date and time:52>>> import os53>>> with TestIO():54...     result = run_subprocess("hyd.py logfile=default")55>>> with TestIO():56...     for filename in os.listdir("."):57...         if filename.endswith(".log"):58...             print(filename)    # doctest: +ELLIPSIS59hydpy_...log60Without any further arguments, `hyd.py` does not know which function to call:61>>> result = run_subprocess("hyd.py")    # doctest: +ELLIPSIS62Invoking hyd.py without arguments resulted in the following error:63The first positional argument defining the function to be called is missing.64...65The first additional argument must be an available "script function":66>>> result = run_subprocess("hyd.py "67...                         "wrong_argument")    # doctest: +ELLIPSIS68Invoking hyd.py with argument `wrong_argument` resulted in the \69following error:70There is no `wrong_argument` function callable by `hyd.py`.  Choose one of \71the following instead: await_server, exec_commands, exec_script, \72run_doctests, run_simulation, start_server, start_shell, and xml_replace.73...74Further argument requirements depend on the selected "script function":75>>> result = run_subprocess("hyd.py "76...                         "exec_commands")    # doctest: +ELLIPSIS77Invoking hyd.py with argument `exec_commands` resulted in the \78following error:79Function `exec_commands` requires `1` positional arguments (commands), \80but `0` are given.81...82>>> result = run_subprocess("hyd.py "83...                         "exec_commands "84...                         "first_name "85...                         "second_name")    # doctest: +ELLIPSIS86Invoking hyd.py with arguments `exec_commands, first_name, second_name` \87resulted in the following error:88Function `exec_commands` allows `1` positional arguments (commands), \89but `2` are given \90(first_name and second_name).91...92Optional keyword arguments are supported: (on Linux, we have to escape93the characters "(", ")", ";", and "'" in the following)94>>> import platform95>>> esc = "" if "windows" in platform.platform().lower() else "\\\\"96>>> result = run_subprocess(f"hyd.py "97...                         f"exec_commands "98...                         f"print{esc}(x+y{esc}) "99...                         f"x={esc}'2{esc}' "100...                         f"y={esc}'=1+1{esc}'")101Start to execute the commands ['print(x+y)'] for testing purposes.1022=1+1103Error messages raised by the "script function" itself also find their104way into the console or log file:105>>> result = run_subprocess(    # doctest: +ELLIPSIS106...    f"hyd.py exec_commands "107...    f"raise_RuntimeError{esc}({esc}'it_fails{esc}'{esc})")108Start to execute the commands ["raise_RuntimeError('it_fails')"] for \109testing purposes.110Invoking hyd.py with arguments `exec_commands, raise_RuntimeError('it_fails')` \111resulted in the following error:112it fails113...114The same is true for warning messages:115>>> result = run_subprocess(f"hyd.py "    # doctest: +ELLIPSIS116...                         f"exec_commands "117...                         f"import_warnings{esc};"118...                         f"warnings.warn{esc}({esc}'it_stumbles{esc}'{esc})")119Start to execute the commands ['import_warnings', \120"warnings.warn('it_stumbles')"] for testing purposes...121...UserWarning: it stumbles122And the same is true for printed messages:123>>> result = run_subprocess(f"hyd.py "124...                         f"exec_commands "125...                         f"print{esc}({esc}'it_works{esc}'{esc})")126Start to execute the commands ["print('it_works')"] \127for testing purposes.128it works129To report the importance level of individual log messages, use the130optional `logstyle` keyword argument:131>>> result = run_subprocess(     # doctest: +ELLIPSIS132...     f"hyd.py exec_commands "133...     f"print{esc}({esc}'it_works{esc}'{esc}){esc};"134...     f"import_warnings{esc};"135...     f"warnings.warn{esc}({esc}'it_stumbles{esc}'{esc}){esc};"136...     f"raise_RuntimeError{esc}({esc}'it_fails{esc}'{esc}) "137...     f"logstyle=prefixed")138info: Start to execute the commands ["print('it_works')", 'import_warnings', \139"warnings.warn('it_stumbles')", "raise_RuntimeError('it_fails')"] for \140testing purposes.141info: it works142warning: ...UserWarning: it stumbles143error: Invoking hyd.py with arguments `exec_commands, \144print('it_works');import_warnings;warnings.warn('it_stumbles');\145raise_RuntimeError('it_fails'), logstyle=prefixed` resulted in \146the following error:147error: it fails148...149So far, only `prefixed` and the default style `plain` are implemented:150>>> result = run_subprocess(f"hyd.py "    # doctest: +ELLIPSIS151...                         f"exec_commands "152...                         f"None "153...                         f"logstyle=missing")154Invoking hyd.py with arguments `exec_commands, None, logstyle=missing` \155resulted in the following error:156The given log file style missing is not available.  Please choose one \157of the following: plain and prefixed.158...159See the documentation on module |xmltools| for an actually successful160example using the "script function" |run_simulation|.161"""162# import...163# ...from standard-library164import sys165# ...from hydpy166from hydpy.exe import commandtools167def execute() -> None:168    """Call |execute_scriptfunction| of module |commandtools| in case |hyd|169    is the main script.170    >>> from hydpy.exe import hyd171    >>> from unittest import mock172    >>> with mock.patch("hydpy.exe.commandtools.execute_scriptfunction") as fun:173    ...     with mock.patch("sys.exit") as exit_:174    ...         with mock.patch.object(hyd, "__name__", "__not_main__"):175    ...             hyd.execute()176    ...             exit_.called177    False178    >>> for return_value in (None, 0, 2):179    ...     with mock.patch(180    ...             "hydpy.exe.commandtools.execute_scriptfunction",181    ...             return_value=return_value) as fun:182    ...         with mock.patch("sys.exit") as exit_:183    ...             with mock.patch.object(hyd, "__name__", "__main__"):184    ...                 hyd.execute()185    ...                 exit_.call_args186    call(False)187    call(False)188    call(True)189    """190    if __name__ == "__main__":191        sys.exit(bool(commandtools.execute_scriptfunction()))...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
