Best Python code snippet using playwright-python
test_processdispatcher_service.py
Source:test_processdispatcher_service.py  
...322                                        queued=queued)323        # ensure that procs that request existing engine id run324        self.client.schedule_process("proc2", self.process_definition_id,325                queueing_mode=proc1_queueing_mode, execution_engine_id="engine1")326        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)327        self._wait_assert_pd_dump(self._assert_process_states,328                ProcessState.RUNNING, ["proc2"])329        # ensure that procs that don't specify an engine id run330        self.client.schedule_process("proc3", self.process_definition_id,331                queueing_mode=proc1_queueing_mode)332        self.notifier.wait_for_state("proc3", ProcessState.RUNNING)333        self._wait_assert_pd_dump(self._assert_process_states,334                ProcessState.RUNNING, ["proc3"])335        # now add an engine for proc1 and it should be scheduled336        self.client.node_state("node2", domain_id_from_engine("engine2"),337            InstanceState.RUNNING)338        self._spawn_eeagent("node2", 4)339        self.notifier.wait_for_state("proc1", ProcessState.RUNNING)340        self._wait_assert_pd_dump(self._assert_process_states,341            ProcessState.RUNNING, ["proc1"])342        # now launch another process for engine2. it should be scheduled too343        self.client.schedule_process("proc4", self.process_definition_id,344            queueing_mode=QueueingMode.NEVER, execution_engine_id="engine2")345        self.notifier.wait_for_state("proc4", ProcessState.RUNNING)346        self._wait_assert_pd_dump(self._assert_process_states,347            ProcessState.RUNNING, ["proc4"])348    def test_default_ee(self):349        self.client.node_state("node1", domain_id_from_engine("engine1"),350            InstanceState.RUNNING)351        self._spawn_eeagent("node1", 4)352        self.client.node_state("node2", domain_id_from_engine("engine2"),353            InstanceState.RUNNING)354        self._spawn_eeagent("node2", 4)355        # fill up all 4 slots on engine1 agent and launch one more proc356        for upid in ['p1', 'p2', 'p3', 'p4', 'p5']:357            self.client.schedule_process(upid, self.process_definition_id,358                queueing_mode=QueueingMode.ALWAYS)359        self.notifier.wait_for_state('p1', ProcessState.RUNNING)360        self.notifier.wait_for_state('p2', ProcessState.RUNNING)361        self.notifier.wait_for_state('p3', ProcessState.RUNNING)362        self.notifier.wait_for_state('p4', ProcessState.RUNNING)363        # p5 should be queued since it is not compatible with engine2364        self.notifier.wait_for_state('p5', ProcessState.WAITING)365        # now schedule p6 directly to engine2366        self.client.schedule_process("p6", self.process_definition_id,367            queueing_mode=QueueingMode.ALWAYS, execution_engine_id="engine2")368        self.notifier.wait_for_state('p1', ProcessState.RUNNING)369        # add another eeagent for engine1, p5 should run370        self.client.node_state("node3", domain_id_from_engine("engine1"),371            InstanceState.RUNNING)372        self._spawn_eeagent("node3", 4)373        self.notifier.wait_for_state('p5', ProcessState.RUNNING)374    def test_process_engine_map(self):375        def1 = uuid.uuid4().hex376        self.client.create_definition(def1, "dtype",377            executable={"module": "a.b", "class": "C"},378            name="my_process")379        def2 = uuid.uuid4().hex380        self.client.create_definition(def2, "dtype",381            executable={"module": "a.b", "class": "D"},382            name="my_process")383        def3 = uuid.uuid4().hex384        self.client.create_definition(def3, "dtype",385            executable={"module": "a", "class": "B"},386            name="my_process")387        self.client.node_state("node1", domain_id_from_engine("engine1"),388            InstanceState.RUNNING)389        eeagent1 = self._spawn_eeagent("node1", 4)390        self.client.node_state("node2", domain_id_from_engine("engine2"),391            InstanceState.RUNNING)392        eeagent2 = self._spawn_eeagent("node2", 4)393        self.client.node_state("node3", domain_id_from_engine("engine3"),394            InstanceState.RUNNING)395        eeagent3 = self._spawn_eeagent("node3", 4)396        self.client.schedule_process("proc1", def1)397        self.client.schedule_process("proc2", def2)398        self.client.schedule_process("proc3", def3)399        self.notifier.wait_for_state("proc1", ProcessState.RUNNING)400        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)401        self.notifier.wait_for_state("proc3", ProcessState.RUNNING)402        proc1 = self.client.describe_process("proc1")403        self.assertEqual(proc1['assigned'], eeagent3.name)404        proc2 = self.client.describe_process("proc2")405        self.assertEqual(proc2['assigned'], eeagent2.name)406        proc3 = self.client.describe_process("proc3")407        self.assertEqual(proc3['assigned'], eeagent1.name)408    def test_node_exclusive(self):409        node = "node1"410        domain_id = domain_id_from_engine('engine1')411        node_properties = dict(engine="fedora")412        self.client.node_state(node, domain_id, InstanceState.RUNNING,413                node_properties)414        self._spawn_eeagent(node, 4)415        exclusive_attr = "hamsandwich"416        queued = []417        proc1_queueing_mode = QueueingMode.ALWAYS418        # Process should be scheduled, since no other procs have its419        # exclusive attribute420        self.client.schedule_process("proc1", self.process_definition_id,421                queueing_mode=proc1_queueing_mode,422                node_exclusive=exclusive_attr)423        self.notifier.wait_for_state("proc1", ProcessState.RUNNING)424        self._wait_assert_pd_dump(self._assert_process_states,425                ProcessState.RUNNING, ["proc1"])426        # Process should be queued, because proc1 has the same attribute427        self.client.schedule_process("proc2", self.process_definition_id,428                queueing_mode=proc1_queueing_mode,429                node_exclusive=exclusive_attr)430        queued.append("proc2")431        self._wait_assert_pd_dump(self._assert_process_distribution,432                                        queued=queued)433        # Now kill the first process, and proc2 should run.434        self.client.terminate_process("proc1")435        queued.remove("proc2")436        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)437        self._wait_assert_pd_dump(self._assert_process_states,438                ProcessState.RUNNING, ["proc2"])439        # Process should be queued, because proc2 has the same attribute440        self.client.schedule_process("proc3", self.process_definition_id,441                queueing_mode=proc1_queueing_mode,442                node_exclusive=exclusive_attr)443        queued.append("proc3")444        self._wait_assert_pd_dump(self._assert_process_distribution,445                                        queued=queued)446        # Process should be scheduled, since no other procs have its447        # exclusive attribute448        other_exclusive_attr = "hummussandwich"449        self.client.schedule_process("proc4", self.process_definition_id,450                queueing_mode=proc1_queueing_mode,451                node_exclusive=other_exclusive_attr)452        self.notifier.wait_for_state("proc4", ProcessState.RUNNING)453        self._wait_assert_pd_dump(self._assert_process_states,454                ProcessState.RUNNING, ["proc4"])455        # Now that we've started another node, waiting node should start456        node = "node2"457        node_properties = dict(engine="fedora")458        self.client.node_state(node, domain_id, InstanceState.RUNNING,459                node_properties)460        self._spawn_eeagent(node, 4)461        self.notifier.wait_for_state("proc3", ProcessState.RUNNING)462        self._wait_assert_pd_dump(self._assert_process_states,463                ProcessState.RUNNING, ["proc3"])464    def test_node_exclusive_bug(self):465        slots = 2466        node_1 = "node1"467        domain_id = domain_id_from_engine('engine1')468        node_properties = dict(engine="fedora")469        self.client.node_state(node_1, domain_id, InstanceState.RUNNING,470                node_properties)471        self._spawn_eeagent(node_1, slots)472        node_2 = "node2"473        domain_id = domain_id_from_engine('engine1')474        node_properties = dict(engine="fedora")475        self.client.node_state(node_2, domain_id, InstanceState.RUNNING,476                node_properties)477        self._spawn_eeagent(node_2, slots)478        node_3 = "node3"479        domain_id = domain_id_from_engine('engine1')480        node_properties = dict(engine="fedora")481        self.client.node_state(node_3, domain_id, InstanceState.RUNNING,482                node_properties)483        self._spawn_eeagent(node_3, slots)484        node_4 = "node4"485        domain_id = domain_id_from_engine('engine1')486        node_properties = dict(engine="fedora")487        self.client.node_state(node_4, domain_id, InstanceState.RUNNING,488                node_properties)489        self._spawn_eeagent(node_4, slots)490        pydap_xattr = "pydap"491        service_gateway_xattr = "service_gateway"492        queueing_mode = QueueingMode.START_ONLY493        # Process should be scheduled, since no other procs have its494        # exclusive attribute495        pydap_xattr_procs = []496        service_gateway_xattr_procs = []497        proc_1 = "proc_1"498        self.client.schedule_process(proc_1, self.process_definition_id,499                queueing_mode=queueing_mode,500                node_exclusive=pydap_xattr)501        pydap_xattr_procs.append(proc_1)502        proc_2 = "proc_2"503        self.client.schedule_process(proc_2, self.process_definition_id,504                queueing_mode=queueing_mode,505                node_exclusive=service_gateway_xattr)506        pydap_xattr_procs.append(proc_2)507        proc_3 = "proc_3"508        self.client.schedule_process(proc_3, self.process_definition_id,509                queueing_mode=queueing_mode,510                node_exclusive=service_gateway_xattr)511        service_gateway_xattr_procs.append(proc_1)512        proc_4 = "proc_4"513        self.client.schedule_process(proc_4, self.process_definition_id,514                queueing_mode=queueing_mode,515                node_exclusive=pydap_xattr)516        pydap_xattr_procs.append(proc_4)517        proc_5 = "proc_5"518        self.client.schedule_process(proc_5, self.process_definition_id,519                queueing_mode=queueing_mode,520                node_exclusive=service_gateway_xattr)521        service_gateway_xattr_procs.append(proc_5)522        proc_6 = "proc_6"523        self.client.schedule_process(proc_6, self.process_definition_id,524                queueing_mode=queueing_mode,525                node_exclusive=pydap_xattr)526        pydap_xattr_procs.append(proc_6)527        proc_7 = "proc_7"528        self.client.schedule_process(proc_7, self.process_definition_id,529                queueing_mode=queueing_mode,530                node_exclusive=service_gateway_xattr)531        service_gateway_xattr_procs.append(proc_7)532        proc_8 = "proc_8"533        self.client.schedule_process(proc_8, self.process_definition_id,534                queueing_mode=queueing_mode,535                node_exclusive=pydap_xattr)536        pydap_xattr_procs.append(proc_8)537        for proc in (pydap_xattr_procs + service_gateway_xattr_procs):538            self.notifier.wait_for_state(proc, ProcessState.RUNNING)539            self._wait_assert_pd_dump(self._assert_process_states,540                    ProcessState.RUNNING, [proc])541        self._wait_assert_pd_dump(self._assert_node_exclusive)542        self.client.terminate_process(proc_8)543        self._wait_assert_pd_dump(self._assert_node_exclusive)544    def test_node_exclusive_multiple_eeagents(self):545        node = "node1"546        domain_id = domain_id_from_engine('engine1')547        node_properties = dict(engine="fedora")548        self.client.node_state(node, domain_id, InstanceState.RUNNING,549                node_properties)550        self._spawn_eeagent(node, 4)551        self._spawn_eeagent(node, 4)552        exclusive_attr = "hamsandwich"553        queued = []554        proc1_queueing_mode = QueueingMode.ALWAYS555        # Process should be scheduled, since no other procs have its556        # exclusive attribute557        self.client.schedule_process("proc1", self.process_definition_id,558                queueing_mode=proc1_queueing_mode,559                node_exclusive=exclusive_attr)560        self.notifier.wait_for_state("proc1", ProcessState.RUNNING)561        self._wait_assert_pd_dump(self._assert_process_states,562                ProcessState.RUNNING, ["proc1"])563        # Process should be queued, because proc1 has the same attribute564        self.client.schedule_process("proc2", self.process_definition_id,565                queueing_mode=proc1_queueing_mode,566                node_exclusive=exclusive_attr)567        queued.append("proc2")568        self._wait_assert_pd_dump(self._assert_process_distribution,569                                        queued=queued)570        # Now kill the first process, and proc2 should run.571        self.client.terminate_process("proc1")572        queued.remove("proc2")573        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)574        self._wait_assert_pd_dump(self._assert_process_states,575                ProcessState.RUNNING, ["proc2"])576        # Process should be queued, because proc2 has the same attribute577        self.client.schedule_process("proc3", self.process_definition_id,578                queueing_mode=proc1_queueing_mode,579                node_exclusive=exclusive_attr)580        queued.append("proc3")581        self._wait_assert_pd_dump(self._assert_process_distribution,582                                        queued=queued)583        # Process should be scheduled, since no other procs have its584        # exclusive attribute585        other_exclusive_attr = "hummussandwich"586        self.client.schedule_process("proc4", self.process_definition_id,587                queueing_mode=proc1_queueing_mode,588                node_exclusive=other_exclusive_attr)589        self.notifier.wait_for_state("proc4", ProcessState.RUNNING)590        self._wait_assert_pd_dump(self._assert_process_states,591                ProcessState.RUNNING, ["proc4"])592        # Now that we've started another node, waiting node should start593        node = "node2"594        node_properties = dict(engine="fedora")595        self.client.node_state(node, domain_id, InstanceState.RUNNING,596                node_properties)597        self._spawn_eeagent(node, 4)598        self.notifier.wait_for_state("proc3", ProcessState.RUNNING)599        self._wait_assert_pd_dump(self._assert_process_states,600                ProcessState.RUNNING, ["proc3"])601    def test_queueing(self):602        # submit some processes before there are any resources available603        procs = ["proc1", "proc2", "proc3", "proc4", "proc5"]604        for proc in procs:605            procstate = self.client.schedule_process(proc, self.process_definition_id)606            self.assertEqual(procstate['upid'], proc)607        for proc in procs:608            self.notifier.wait_for_state(proc, ProcessState.WAITING)609        self._wait_assert_pd_dump(self._assert_process_states,610                                        ProcessState.WAITING, procs)611        # add 2 nodes and a resource that supports 4 processes612        nodes = ["node1", "node2"]613        domain_id = domain_id_from_engine('engine1')614        for node in nodes:615            self.client.node_state(node, domain_id, InstanceState.RUNNING)616        self._spawn_eeagent(nodes[0], 4)617        for proc in procs[:4]:618            self.notifier.wait_for_state(proc, ProcessState.RUNNING)619        self._wait_assert_pd_dump(self._assert_process_states,620                                        ProcessState.RUNNING, procs[:4])621        for proc in procs[4:]:622            self.notifier.wait_for_state(proc, ProcessState.WAITING)623        self._wait_assert_pd_dump(self._assert_process_states,624                                        ProcessState.WAITING, procs[4:])625        # stand up a resource on the second node to support the other process626        self._spawn_eeagent(nodes[1], 4)627        # all processes should now be running628        for proc in procs:629            self.notifier.wait_for_state(proc, ProcessState.RUNNING)630        self._wait_assert_pd_dump(self._assert_process_states,631                                        ProcessState.RUNNING, procs)632    def _assert_process_states(self, dump, expected_state, upids):633        for upid in upids:634            process = dump['processes'][upid]635            assert process['state'] == expected_state, "%s: %s, expected %s!" % (636                upid, process['state'], expected_state)637    def test_node_death(self):638        # set up two nodes with 4 slots each639        nodes = ['node1', 'node2']640        domain_id = domain_id_from_engine('engine1')641        for node in nodes:642            self.client.node_state(node, domain_id, InstanceState.RUNNING)643        for node in nodes:644            self._spawn_eeagent(node, 4)645        # 8 total slots are available, schedule 6 processes646        procs = ['proc' + str(i + 1) for i in range(6)]647        # schedule the first process to never restart. it shouldn't come back.648        self.client.schedule_process(procs[0], self.process_definition_id,649            restart_mode=RestartMode.NEVER)650        # and the second process to restart on abnormal termination. it should651        # come back.652        self.client.schedule_process(procs[1], self.process_definition_id,653            restart_mode=RestartMode.ABNORMAL)654        for proc in procs[2:]:655            self.client.schedule_process(proc, self.process_definition_id)656        self._wait_assert_pd_dump(self._assert_process_distribution,657                                        node_counts=[4, 2],658                                        queued_count=0)659        # now kill one node660        log.debug("killing node %s", nodes[0])661        self.client.node_state(nodes[0], domain_id, InstanceState.TERMINATING)662        # 5 procesess should be rescheduled. since we have 5 processes and only663        # 4 slots, 1 should be queued664        self._wait_assert_pd_dump(self._assert_process_distribution,665                                  node_counts=[4],666                                  queued_count=1)667        # ensure that the correct process was not rescheduled668        self.notifier.wait_for_state(procs[0], ProcessState.FAILED)669    def _assert_process_distribution(self, dump, nodes=None, node_counts=None,670                                     agents=None, agent_counts=None,671                                     queued=None, queued_count=None,672                                     rejected=None, rejected_count=None):673        # Assert the distribution of processes among nodes674        # node and agent counts are given as sequences of integers which are not675        # specific to a named node. So specifying node_counts=[4,3] will match676        # as long as you have 4 processes assigned to one node and 3 to another,677        # regardless of the node name678        found_rejected = set()679        found_queued = set()680        found_node = defaultdict(set)681        found_assigned = defaultdict(set)682        for process in dump['processes'].itervalues():683            upid = process['upid']684            assigned = process['assigned']685            if process['state'] == ProcessState.WAITING:686                found_queued.add(upid)687            elif process['state'] == ProcessState.REJECTED:688                found_rejected.add(upid)689            elif process['state'] == ProcessState.RUNNING:690                resource = dump['resources'].get(assigned)691                self.assertIsNotNone(resource)692                node_id = resource['node_id']693                found_node[node_id].add(upid)694                found_assigned[assigned].add(upid)695        print "Queued: %s\nRejected: %s\n" % (queued, rejected)696        print "Found Queued: %s\nFound Rejected: %s\n" % (found_queued, found_rejected)697        if queued is not None:698            self.assertEqual(set(queued), found_queued)699        if queued_count is not None:700            self.assertEqual(len(found_queued), queued_count)701        if rejected is not None:702            self.assertEqual(set(rejected), found_rejected)703        if rejected_count is not None:704            self.assertEqual(len(found_rejected), rejected_count)705        if agents is not None:706            self.assertEqual(set(agents.keys()), set(found_assigned.keys()))707            for ee_id, processes in found_assigned.iteritems():708                self.assertEqual(set(agents[ee_id]), processes)709        if agent_counts is not None:710            assigned_lengths = [len(s) for s in found_assigned.itervalues()]711            # omit zero counts712            agent_counts = [count for count in agent_counts if count != 0]713            # print "%s =?= %s" % (agent_counts, assigned_lengths)714            self.assertEqual(sorted(assigned_lengths), sorted(agent_counts))715        if nodes is not None:716            self.assertEqual(set(nodes.keys()), set(found_node.keys()))717            for node_id, processes in found_node.iteritems():718                self.assertEqual(set(nodes[node_id]), processes)719        if node_counts is not None:720            node_lengths = [len(s) for s in found_node.itervalues()]721            # omit zero counts722            node_counts = [count for count in node_counts if count != 0]723            self.assertEqual(sorted(node_lengths), sorted(node_counts))724    def _assert_node_exclusive(self, dump):725        """assert that processes are distributed in a way consistent726        with the node exclusive properties of those processes727        """728        exclusive_dist = {}729        for proc_id, proc in dump['processes'].iteritems():730            if proc['state'] == '700-TERMINATED':731                continue732            assigned = proc.get('assigned')733            assert assigned is not None, proc734            node_exclusive = proc.get('node_exclusive')735            assert node_exclusive is not None736            if exclusive_dist.get(assigned) is None:737                exclusive_dist[assigned] = []738            exclusive_dist[assigned].append(node_exclusive)739            exclusive_dist[assigned].sort()740        for node, exclusives in exclusive_dist.iteritems():741            assert len(exclusives) == len(set(exclusives))742        exclusive_dist_nodes = {}743        exclusive_dist_resources = {}744        for node_id, node in dump['nodes'].iteritems():745            exclusive_dist_nodes[node_id] = node['node_exclusive']746            exclusive_dist_nodes[node_id].sort()747            for resource in node['resources']:748                exclusive_dist_resources[resource] = node['node_exclusive']749                exclusive_dist_resources[resource].sort()750        for node, exclusives in exclusive_dist_nodes.iteritems():751            assert len(exclusives) == len(set(exclusives))752        print "nodes: %s" % exclusive_dist_nodes753        print "resources: %s" % exclusive_dist_resources754        print "proc: %s" % exclusive_dist755        assert exclusive_dist == exclusive_dist_resources, "%s != %s" % (exclusive_dist, exclusive_dist_resources)756        return exclusive_dist757    def test_constraints(self):758        nodes = ['node1', 'node2']759        domain_id = domain_id_from_engine('engine1')760        node1_properties = dict(hat_type="fedora")761        node2_properties = dict(hat_type="bowler")762        self.client.node_state(nodes[0], domain_id, InstanceState.RUNNING,763            node1_properties)764        self._spawn_eeagent(nodes[0], 4)765        proc1_constraints = dict(hat_type="fedora")766        proc2_constraints = dict(hat_type="bowler")767        self.client.schedule_process("proc1", self.process_definition_id,768            constraints=proc1_constraints)769        self.client.schedule_process("proc2", self.process_definition_id,770            constraints=proc2_constraints)771        # proc1 should be running on the node/agent, proc2 queued772        self._wait_assert_pd_dump(self._assert_process_distribution,773                                        nodes=dict(node1=["proc1"]),774                                        queued=["proc2"])775        # launch another eeagent that supports proc2's engine_type776        self.client.node_state(nodes[1], domain_id, InstanceState.RUNNING,777            node2_properties)778        self._spawn_eeagent(nodes[1], 4)779        self._wait_assert_pd_dump(self._assert_process_distribution,780                                        nodes=dict(node1=["proc1"],781                                                   node2=["proc2"]),782                                        queued=[])783    def test_queue_mode(self):784        constraints = dict(hat_type="fedora")785        queued = []786        rejected = []787        # Test QueueingMode.NEVER788        proc1_queueing_mode = QueueingMode.NEVER789        self.client.schedule_process("proc1", self.process_definition_id,790            constraints=constraints, queueing_mode=proc1_queueing_mode)791        # proc1 should be rejected792        rejected.append("proc1")793        self._wait_assert_pd_dump(self._assert_process_distribution,794                                        rejected=rejected)795        # Test QueueingMode.ALWAYS796        proc2_queueing_mode = QueueingMode.ALWAYS797        self.client.schedule_process("proc2", self.process_definition_id,798            constraints=constraints, queueing_mode=proc2_queueing_mode)799        # proc2 should be queued800        queued.append("proc2")801        self._wait_assert_pd_dump(self._assert_process_distribution,802                                        queued=queued)803        # Test QueueingMode.START_ONLY804        proc3_queueing_mode = QueueingMode.START_ONLY805        proc3_restart_mode = RestartMode.ALWAYS806        self.client.schedule_process("proc3", self.process_definition_id,807            constraints=constraints, queueing_mode=proc3_queueing_mode,808            restart_mode=proc3_restart_mode)809        # proc3 should be queued, since its start_only810        queued.append("proc3")811        self._wait_assert_pd_dump(self._assert_process_distribution,812                                        queued=queued)813        node = "node1"814        domain_id = domain_id_from_engine('engine1')815        node_properties = dict(hat_type="fedora")816        self.client.node_state(node, domain_id, InstanceState.RUNNING,817                node_properties)818        self._spawn_eeagent(node, 4)819        # we created a node, so it should now run820        self.notifier.wait_for_state("proc3", ProcessState.RUNNING)821        self._wait_assert_pd_dump(self._assert_process_states,822                ProcessState.RUNNING, ["proc3"])823        log.debug("killing node %s", node)824        self._kill_all_eeagents()825        self.client.node_state(node, domain_id, InstanceState.TERMINATING)826        # proc3 should now be rejected, because its START_ONLY827        queued.remove("proc3")828        rejected.append("proc3")829        self._wait_assert_pd_dump(self._assert_process_distribution,830                                        rejected=rejected)831        # Test QueueingMode.RESTART_ONLY832        # First test that its rejected if it doesn't start right away833        proc4_queueing_mode = QueueingMode.RESTART_ONLY834        proc4_restart_mode = RestartMode.ALWAYS835        self.client.schedule_process("proc4", self.process_definition_id,836            constraints=constraints, queueing_mode=proc4_queueing_mode,837            restart_mode=proc4_restart_mode)838        # proc4 should be rejected, since its RESTART_ONLY839        rejected.append("proc4")840        self._wait_assert_pd_dump(self._assert_process_distribution,841                                        rejected=rejected)842        # Second test that if a proc starts, it'll get queued after it fails843        proc5_queueing_mode = QueueingMode.RESTART_ONLY844        proc5_restart_mode = RestartMode.ALWAYS845        # Start a node846        self.client.node_state(node, domain_id, InstanceState.RUNNING,847                node_properties)848        self._spawn_eeagent(node, 4)849        self.client.schedule_process("proc5", self.process_definition_id,850            constraints=constraints, queueing_mode=proc5_queueing_mode,851            restart_mode=proc5_restart_mode)852        self.notifier.wait_for_state("proc5", ProcessState.RUNNING)853        self._wait_assert_pd_dump(self._assert_process_states,854                ProcessState.RUNNING, ["proc5"])855        log.debug("killing node %s", node)856        self.client.node_state(node, domain_id, InstanceState.TERMINATING)857        self._kill_all_eeagents()858        # proc5 should be queued, since its RESTART_ONLY859        queued.append("proc5")860        self._wait_assert_pd_dump(self._assert_process_distribution,861                                        queued=queued)862    def test_restart_mode_never(self):863        constraints = dict(hat_type="fedora")864        # Start a node865        node = "node1"866        domain_id = domain_id_from_engine('engine1')867        node_properties = dict(hat_type="fedora")868        self.client.node_state(node, domain_id, InstanceState.RUNNING,869                node_properties)870        eeagent = self._spawn_eeagent(node, 4)871        # Test RestartMode.NEVER872        proc1_queueing_mode = QueueingMode.ALWAYS873        proc1_restart_mode = RestartMode.NEVER874        self.client.schedule_process("proc1", self.process_definition_id,875            constraints=constraints, queueing_mode=proc1_queueing_mode,876            restart_mode=proc1_restart_mode)877        self.notifier.wait_for_state("proc1", ProcessState.RUNNING)878        self._wait_assert_pd_dump(self._assert_process_states,879                ProcessState.RUNNING, ["proc1"])880        eeagent.fail_process("proc1")881        self.notifier.wait_for_state("proc1", ProcessState.FAILED)882        self._wait_assert_pd_dump(self._assert_process_states,883                ProcessState.FAILED, ["proc1"])884    def test_restart_mode_always(self):885        constraints = dict(hat_type="fedora")886        queued = []887        # Start a node888        node = "node1"889        domain_id = domain_id_from_engine('engine1')890        node_properties = dict(hat_type="fedora")891        self.client.node_state(node, domain_id, InstanceState.RUNNING,892                node_properties)893        eeagent = self._spawn_eeagent(node, 4)894        # Test RestartMode.ALWAYS895        proc2_queueing_mode = QueueingMode.ALWAYS896        proc2_restart_mode = RestartMode.ALWAYS897        self.client.schedule_process("proc2", self.process_definition_id,898            constraints=constraints, queueing_mode=proc2_queueing_mode,899            restart_mode=proc2_restart_mode, configuration={'process': {'minimum_time_between_starts': 0.1}})900        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)901        self._wait_assert_pd_dump(self._assert_process_states,902                ProcessState.RUNNING, ["proc2"])903        eeagent.exit_process("proc2")904        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)905        self._wait_assert_pd_dump(self._assert_process_states,906                ProcessState.RUNNING, ["proc2"])907        eeagent.fail_process("proc2")908        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)909        self._wait_assert_pd_dump(self._assert_process_states,910                ProcessState.RUNNING, ["proc2"])911        log.debug("killing node %s", node)912        self.client.node_state(node, domain_id, InstanceState.TERMINATING)913        self._kill_all_eeagents()914        # proc2 should be queued, since there are no more resources915        queued.append("proc2")916        self._wait_assert_pd_dump(self._assert_process_distribution,917                                        queued=queued)918    def test_restart_mode_abnormal(self):919        constraints = dict(hat_type="fedora")920        queued = []921        # Start a node922        node = "node1"923        domain_id = domain_id_from_engine('engine1')924        node_properties = dict(hat_type="fedora")925        self.client.node_state(node, domain_id, InstanceState.RUNNING,926                node_properties)927        eeagent = self._spawn_eeagent(node, 4)928        # Test RestartMode.ABNORMAL929        proc2_queueing_mode = QueueingMode.ALWAYS930        proc2_restart_mode = RestartMode.ABNORMAL931        self.client.schedule_process("proc2", self.process_definition_id,932            constraints=constraints, queueing_mode=proc2_queueing_mode,933            restart_mode=proc2_restart_mode)934        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)935        self._wait_assert_pd_dump(self._assert_process_states,936                ProcessState.RUNNING, ["proc2"])937        eeagent.fail_process("proc2")938        # This can be very slow on buildbot, hence the long timeout939        self.notifier.wait_for_state("proc2", ProcessState.RUNNING, 60)940        self._wait_assert_pd_dump(self._assert_process_states,941                ProcessState.RUNNING, ["proc2"])942        log.debug("killing node %s", node)943        self.client.node_state(node, domain_id, InstanceState.TERMINATING)944        self._kill_all_eeagents()945        # proc2 should be queued, since there are no more resources946        queued.append("proc2")947        self._wait_assert_pd_dump(self._assert_process_distribution,948                                        queued=queued)949        self.client.node_state(node, domain_id, InstanceState.RUNNING,950                node_properties)951        eeagent = self._spawn_eeagent(node, 4)952        self.client.schedule_process("proc1", self.process_definition_id,953            constraints=constraints, queueing_mode=proc2_queueing_mode,954            restart_mode=proc2_restart_mode)955        self.notifier.wait_for_state("proc1", ProcessState.RUNNING)956        eeagent.exit_process("proc1")957        self.notifier.wait_for_state("proc1", ProcessState.EXITED)958        self._wait_assert_pd_dump(self._assert_process_states,959                ProcessState.EXITED, ["proc1"])960    def test_start_count(self):961        nodes = ['node1']962        domain_id = domain_id_from_engine('engine1')963        node1_properties = dict(hat_type="fedora")964        self.client.node_state(nodes[0], domain_id, InstanceState.RUNNING,965            node1_properties)966        self._spawn_eeagent(nodes[0], 4)967        proc1_constraints = dict(hat_type="fedora")968        self.client.schedule_process("proc1", self.process_definition_id,969            constraints=proc1_constraints)970        # proc1 should be running on the node/agent, proc2 queued971        self._wait_assert_pd_dump(self._assert_process_distribution,972                                        nodes=dict(node1=["proc1"]))973        proc = self.store.get_process(None, "proc1")974        self.assertEqual(proc.starts, 1)975        time.sleep(2)976        self.client.restart_process("proc1")977        # proc1 should be running on the node/agent, proc2 queued978        self._wait_assert_pd_dump(self._assert_process_distribution,979                                        nodes=dict(node1=["proc1"]))980        proc = self.store.get_process(None, "proc1")981        self.assertEqual(proc.starts, 2)982    def test_minimum_time_between_starts(self):983        constraints = dict(hat_type="fedora")984        # Start a node985        node = "node1"986        domain_id = domain_id_from_engine('engine1')987        node_properties = dict(hat_type="fedora")988        self.client.node_state(node, domain_id, InstanceState.RUNNING,989                node_properties)990        eeagent = self._spawn_eeagent(node, 4)991        # Test RestartMode.ALWAYS992        queueing_mode = QueueingMode.ALWAYS993        restart_mode = RestartMode.ALWAYS994        default_time_to_throttle = 2995        time_to_throttle = 10996        self.client.schedule_process("proc1", self.process_definition_id,997            constraints=constraints, queueing_mode=queueing_mode,998            restart_mode=restart_mode)999        self.client.schedule_process("proc2", self.process_definition_id,1000            constraints=constraints, queueing_mode=queueing_mode,1001            restart_mode=restart_mode,1002            configuration=minimum_time_between_starts_config(time_to_throttle))1003        # Processes should start once without delay1004        self.notifier.wait_for_state("proc1", ProcessState.RUNNING)1005        self._wait_assert_pd_dump(self._assert_process_states,1006                ProcessState.RUNNING, ["proc1"])1007        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)1008        self._wait_assert_pd_dump(self._assert_process_states,1009                ProcessState.RUNNING, ["proc2"])1010        # Processes should be restarted once without delay1011        eeagent.exit_process("proc1")1012        eeagent.exit_process("proc2")1013        self.notifier.wait_for_state("proc1", ProcessState.RUNNING)1014        self._wait_assert_pd_dump(self._assert_process_states,1015                ProcessState.RUNNING, ["proc1"])1016        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)1017        self._wait_assert_pd_dump(self._assert_process_states,1018                ProcessState.RUNNING, ["proc2"])1019        # The second time proc1 should be throttled for 2s (the default), and1020        # proc2 should be throttled for the configured 5s1021        eeagent.exit_process("proc1")1022        eeagent.exit_process("proc2")1023        self.notifier.wait_for_state("proc1", ProcessState.WAITING)1024        self._wait_assert_pd_dump(self._assert_process_states,1025                ProcessState.WAITING, ["proc1"])1026        self.notifier.wait_for_state("proc2", ProcessState.WAITING)1027        self._wait_assert_pd_dump(self._assert_process_states,1028                ProcessState.WAITING, ["proc2"])1029        # After waiting a few seconds, proc1 should be restarted1030        time.sleep(default_time_to_throttle + 1)1031        self.notifier.wait_for_state("proc1", ProcessState.RUNNING)1032        self._wait_assert_pd_dump(self._assert_process_states,1033                ProcessState.RUNNING, ["proc1"])1034        self.notifier.wait_for_state("proc2", ProcessState.WAITING)1035        self._wait_assert_pd_dump(self._assert_process_states,1036                ProcessState.WAITING, ["proc2"])1037        # After a few more secs, proc2 should be restarted as well1038        time.sleep(time_to_throttle - (default_time_to_throttle + 1) + 1)1039        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)1040        self._wait_assert_pd_dump(self._assert_process_states,1041                ProcessState.RUNNING, ["proc2"])1042    def test_describe(self):1043        self.client.schedule_process("proc1", self.process_definition_id)1044        processes = self.client.describe_processes()1045        self.assertEqual(len(processes), 1)1046        self.assertEqual(processes[0]['upid'], "proc1")1047        proc1 = self.client.describe_process("proc1")1048        self.assertEqual(proc1['upid'], "proc1")1049        self.client.schedule_process("proc2", self.process_definition_id)1050        processes = self.client.describe_processes()1051        self.assertEqual(len(processes), 2)1052        if processes[0]['upid'] == "proc1":1053            self.assertEqual(processes[1]['upid'], "proc2")1054        elif processes[0]['upid'] == "proc2":1055            self.assertEqual(processes[1]['upid'], "proc1")1056        else:1057            self.fail()1058        proc1 = self.client.describe_process("proc1")1059        self.assertEqual(proc1['upid'], "proc1")1060        proc2 = self.client.describe_process("proc2")1061        self.assertEqual(proc2['upid'], "proc2")1062    def test_process_exited(self):1063        node = "node1"1064        domain_id = domain_id_from_engine('engine1')1065        self.client.node_state(node, domain_id, InstanceState.RUNNING)1066        self._spawn_eeagent(node, 1)1067        proc = "proc1"1068        self.client.schedule_process(proc, self.process_definition_id)1069        self._wait_assert_pd_dump(self._assert_process_states,1070                                  ProcessState.RUNNING, [proc])1071        agent = self._get_eeagent_for_process(proc)1072        agent.exit_process(proc)1073        self._wait_assert_pd_dump(self._assert_process_states,1074                                  ProcessState.EXITED, [proc])1075        self.notifier.wait_for_state(proc, ProcessState.EXITED)1076    def test_neediness(self, process_count=20, node_count=5):1077        procs = ["proc" + str(i) for i in range(process_count)]1078        for proc in procs:1079            procstate = self.client.schedule_process(proc,1080                self.process_definition_id)1081            self.assertEqual(procstate['upid'], proc)1082        self._wait_assert_pd_dump(self._assert_process_states,1083            ProcessState.WAITING, procs)1084        for i in range(3):1085            # retry this a few times to avoid a race between processes1086            # hitting WAITING state and the needs being registered1087            try:1088                self.epum_client.assert_needs(range(node_count + 1),1089                    domain_id_from_engine("engine1"))1090                break1091            except AssertionError:1092                time.sleep(0.01)1093        self.epum_client.clear()1094        # now provide nodes and resources, processes should start1095        nodes = ["node" + str(i) for i in range(node_count)]1096        domain_id = domain_id_from_engine('engine1')1097        for node in nodes:1098            self.client.node_state(node, domain_id, InstanceState.RUNNING)1099        for node in nodes:1100            self._spawn_eeagent(node, 4)1101        self._wait_assert_pd_dump(self._assert_process_states,1102            ProcessState.RUNNING, procs)1103        # now kill all processes in a random order1104        killlist = list(procs)1105        random.shuffle(killlist)1106        for proc in killlist:1107            self.client.terminate_process(proc)1108        self._wait_assert_pd_dump(self._assert_process_states,1109            ProcessState.TERMINATED, procs)1110        for i in range(3):1111            # retry this a few times to avoid a race between processes1112            # hitting WAITING state and the needs being registered1113            try:1114                self.epum_client.assert_needs(range(node_count + 1),1115                    domain_id_from_engine("engine1"))1116                break1117            except AssertionError:1118                time.sleep(0.01)1119    def test_definitions(self):1120        self.client.create_definition("d1", "t1", "notepad.exe")1121        d1 = self.client.describe_definition("d1")1122        self.assertEqual(d1['definition_id'], "d1")1123        self.assertEqual(d1['definition_type'], "t1")1124        self.assertEqual(d1['executable'], "notepad.exe")1125        self.client.update_definition("d1", "t1", "notepad2.exe")1126        d1 = self.client.describe_definition("d1")1127        self.assertEqual(d1['executable'], "notepad2.exe")1128        d_list = self.client.list_definitions()1129        self.assertIn("d1", d_list)1130        self.client.remove_definition("d1")1131    def test_reschedule_process(self):1132        node = "node1"1133        domain_id = domain_id_from_engine('engine1')1134        self.client.node_state(node, domain_id, InstanceState.RUNNING)1135        self._spawn_eeagent(node, 1)1136        proc = "proc1"1137        # start a process that is never restarted automatically.1138        self.client.create_process(proc, self.process_definition_id)1139        self.client.schedule_process(proc, restart_mode=RestartMode.NEVER)1140        self._wait_assert_pd_dump(self._assert_process_states,1141                                  ProcessState.RUNNING, [proc])1142        agent = self._get_eeagent_for_process(proc)1143        agent.exit_process(proc)1144        self._wait_assert_pd_dump(self._assert_process_states,1145                                  ProcessState.EXITED, [proc])1146        self.notifier.wait_for_state(proc, ProcessState.EXITED)1147        record = self.client.schedule_process(proc)1148        self.assertEqual(record['state'], ProcessState.REQUESTED)1149        self.notifier.wait_for_state(proc, ProcessState.RUNNING)1150        # now fail the process. it should still be restartable.1151        agent.fail_process(proc)1152    def test_create_schedule(self):1153        node = "node1"1154        domain_id = domain_id_from_engine('engine1')1155        self.client.node_state(node, domain_id, InstanceState.RUNNING)1156        self._spawn_eeagent(node, 1)1157        proc = "proc1"1158        # create a process. it should be UNSCHEDULED until we schedule it1159        self.client.create_process(proc, self.process_definition_id)1160        self._wait_assert_pd_dump(self._assert_process_states,1161                                  ProcessState.UNSCHEDULED, [proc])1162        # creating again is harmless1163        self.client.create_process(proc, self.process_definition_id)1164        # now schedule it1165        self.client.schedule_process(proc)1166        self._wait_assert_pd_dump(self._assert_process_states,1167                                  ProcessState.RUNNING, [proc])1168        self.notifier.wait_for_state(proc, ProcessState.RUNNING)1169        # scheduling again is harmless1170        self.client.schedule_process(proc)1171    def test_restart_system_boot(self):1172        # set up some state in the PD before restart1173        self.client.node_state("node1", domain_id_from_engine("engine1"),1174            InstanceState.RUNNING)1175        self._spawn_eeagent("node1", 4)1176        procs = [('p1', RestartMode.ABNORMAL, None),1177                 ('p2', RestartMode.ALWAYS, None),1178                 ('p3', RestartMode.NEVER, None),1179                 ('p4', RestartMode.ALWAYS, nosystemrestart_process_config()),1180                 ('p5', RestartMode.ABNORMAL, None),1181                 ('p6', RestartMode.ABNORMAL, nosystemrestart_process_config())]1182        # fill up all 4 slots on engine1 agent and launch 2 more procs1183        for upid, restart_mode, config in procs:1184            self.client.schedule_process(upid, self.process_definition_id,1185                queueing_mode=QueueingMode.ALWAYS, restart_mode=restart_mode,1186                configuration=config)1187        self.notifier.wait_for_state('p1', ProcessState.RUNNING)1188        self.notifier.wait_for_state('p2', ProcessState.RUNNING)1189        self.notifier.wait_for_state('p3', ProcessState.RUNNING)1190        self.notifier.wait_for_state('p4', ProcessState.RUNNING)1191        self.notifier.wait_for_state('p5', ProcessState.WAITING)1192        self.notifier.wait_for_state('p6', ProcessState.WAITING)1193        # now kill PD and eeagents. come back in system restart mode.1194        self.stop_pd()1195        self._kill_all_eeagents()1196        self.store.initialize()1197        self.store.set_system_boot(True)1198        self.store.shutdown()1199        self.start_pd()1200        self.store.wait_initialized(timeout=20)1201        # some processes should come back pending. others should fail out1202        # due to their restart mode flag.1203        self.notifier.wait_for_state('p1', ProcessState.UNSCHEDULED_PENDING)1204        self.notifier.wait_for_state('p2', ProcessState.UNSCHEDULED_PENDING)1205        self.notifier.wait_for_state('p3', ProcessState.TERMINATED)1206        self.notifier.wait_for_state('p4', ProcessState.TERMINATED)1207        self.notifier.wait_for_state('p5', ProcessState.UNSCHEDULED_PENDING)1208        self.notifier.wait_for_state('p6', ProcessState.TERMINATED)1209        # add resources back1210        self.client.node_state("node1", domain_id_from_engine("engine1"),1211            InstanceState.RUNNING)1212        self._spawn_eeagent("node1", 4)1213        # now launch a new process to make sure scheduling still works during1214        # system boot mode1215        self.client.schedule_process("p7", self.process_definition_id)1216        # and restart a couple of the dead procs. one FAILED and one UNSCHEDULED_PENDING1217        self.client.schedule_process("p1")1218        self.client.schedule_process("p4")1219        self.notifier.wait_for_state('p1', ProcessState.RUNNING)1220        self.notifier.wait_for_state('p4', ProcessState.RUNNING)1221        self.notifier.wait_for_state('p7', ProcessState.RUNNING)1222        # finally, end system boot mode. the remaining 2 U-P procs should be scheduled1223        self.client.set_system_boot(False)1224        self._wait_assert_pd_dump(self._assert_process_distribution,1225                                  node_counts=[4],1226                                  queued_count=1)1227        # one process will end up queued. doesn't matter which1228        p2 = self.client.describe_process("p2")1229        p5 = self.client.describe_process("p5")1230        states = set([p2['state'], p5['state']])1231        self.assertEqual(states, set([ProcessState.RUNNING, ProcessState.WAITING]))1232    def test_missing_ee(self):1233        """test_missing_ee1234        Ensure that the PD kills lingering processes on eeagents after they've been1235        evacuated.1236        """1237        # create some fake nodes and tell PD about them1238        node_1 = "node1"1239        domain_id = domain_id_from_engine("engine4")1240        self.client.node_state(node_1, domain_id, InstanceState.RUNNING)1241        # PD knows about this node but hasn't gotten a heartbeat yet1242        # spawn the eeagents and tell them all to heartbeat1243        eeagent_1 = self._spawn_eeagent(node_1, 1)1244        def assert_all_resources(state):1245            eeagent_nodes = set()1246            for resource in state['resources'].itervalues():1247                eeagent_nodes.add(resource['node_id'])1248            self.assertEqual(set([node_1]), eeagent_nodes)1249        self._wait_assert_pd_dump(assert_all_resources)1250        time_to_throttle = 01251        self.client.schedule_process("p1", self.process_definition_id, execution_engine_id="engine4",1252            configuration=minimum_time_between_starts_config(time_to_throttle))1253        # Send a heartbeat to show the process is RUNNING, then wait for doctor1254        # to mark the eeagent missing1255        time.sleep(1)1256        eeagent_1.send_heartbeat()1257        self.notifier.wait_for_state('p1', ProcessState.RUNNING, timeout=30)1258        self.notifier.wait_for_state('p1', ProcessState.WAITING, timeout=30)1259        # Check that process is still 'Running' on the eeagent, even the PD has1260        # since marked it failed1261        eeagent_process = eeagent_1._get_process_with_upid('p1')1262        self.assertEqual(eeagent_process['u_pid'], 'p1')1263        self.assertEqual(eeagent_process['state'], ProcessState.RUNNING)1264        self.assertEqual(eeagent_process['round'], 0)1265        # Now send another heartbeat to start getting procs again1266        eeagent_1.send_heartbeat()1267        self.notifier.wait_for_state('p1', ProcessState.RUNNING, timeout=30)1268        eeagent_process = eeagent_1._get_process_with_upid('p1')1269        self.assertEqual(eeagent_process['u_pid'], 'p1')1270        self.assertEqual(eeagent_process['state'], ProcessState.RUNNING)1271        self.assertEqual(eeagent_process['round'], 1)1272        # The pd should now have rescheduled the proc, and terminated the1273        # lingering process1274        self.assertEqual(len(eeagent_1.history), 1)1275        terminated_history = eeagent_1.history[0]1276        self.assertEqual(terminated_history['u_pid'], 'p1')1277        self.assertEqual(terminated_history['state'], ProcessState.TERMINATED)1278        self.assertEqual(terminated_history['round'], 0)1279    def test_matchmaker_msg_retry(self):1280        node = "node1"1281        domain_id = domain_id_from_engine('engine1')1282        self.client.node_state(node, domain_id, InstanceState.RUNNING)1283        self._spawn_eeagent(node, 1)1284        # sneak in and shorten retry time1285        self.pd.matchmaker.process_launcher.retry_seconds = 0.51286        proc1 = "proc1"1287        proc2 = "proc2"1288        with patch.object(self.pd.matchmaker.process_launcher, "resource_client") as mock_resource_client:1289            # first process request goes to mock, not real client but no error1290            self.client.schedule_process(proc1, self.process_definition_id)1291            # second process hits an error but that should not cause problems1292            mock_resource_client.launch_process.side_effect = Exception("boom!")1293            self.client.schedule_process(proc2, self.process_definition_id)1294            self._wait_assert_pd_dump(self._assert_process_states,1295                                      ProcessState.ASSIGNED, [proc1, proc2])1296            self.assertEqual(mock_resource_client.launch_process.call_count, 2)1297        # now resource client works again. those messages should be retried1298        self.notifier.wait_for_state(proc1, ProcessState.RUNNING)1299        self.notifier.wait_for_state(proc2, ProcessState.RUNNING)1300        self._wait_assert_pd_dump(self._assert_process_states,1301                                  ProcessState.RUNNING, [proc1, proc2])1302class ProcessDispatcherServiceZooKeeperTests(ProcessDispatcherServiceTests, ZooKeeperTestMixin):1303    # this runs all of the ProcessDispatcherService tests wih a ZK store1304    def setup_store(self):1305        self.setup_zookeeper(base_path_prefix="/processdispatcher_service_tests_")1306        store = ProcessDispatcherZooKeeperStore(self.zk_hosts,1307            self.zk_base_path, use_gevent=self.use_gevent)1308        store.initialize()1309        return store1310    def teardown_store(self):1311        if self.store:1312            self.store.shutdown()1313        self.teardown_zookeeper()...test_update.py
Source:test_update.py  
...67        return "test_update_stateless_job_spec.yaml"68    return "test_stateless_job_spec_k8s.yaml"69def test__create_update(stateless_job, in_place):70    stateless_job.create()71    stateless_job.wait_for_state(goal_state="RUNNING")72    old_pod_infos = stateless_job.query_pods()73    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()74    update = StatelessUpdate(75        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_SPEC76    )77    update.create(in_place=in_place)78    update.wait_for_state(goal_state="SUCCEEDED")79    new_pod_infos = stateless_job.query_pods()80    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()81    assert_pod_id_changed(old_pod_infos, new_pod_infos)82    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)83def test__create_update_add_instances(stateless_job, in_place):84    stateless_job.create()85    stateless_job.wait_for_state(goal_state="RUNNING")86    old_pod_infos = stateless_job.query_pods()87    update = StatelessUpdate(88        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC89    )90    update.create(in_place=in_place)91    update.wait_for_state(goal_state="SUCCEEDED")92    new_pod_infos = stateless_job.query_pods()93    assert len(old_pod_infos) == 394    assert len(new_pod_infos) == 595def test__create_update_update_and_add_instances(stateless_job, in_place):96    stateless_job.create()97    stateless_job.wait_for_state(goal_state="RUNNING")98    old_pod_infos = stateless_job.query_pods()99    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()100    update = StatelessUpdate(101        stateless_job,102        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,103    )104    update.create(in_place=in_place)105    update.wait_for_state(goal_state="SUCCEEDED")106    new_pod_infos = stateless_job.query_pods()107    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()108    assert len(old_pod_infos) == 3109    assert len(new_pod_infos) == 5110    assert_pod_id_changed(old_pod_infos, new_pod_infos)111    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)112def test__create_update_update_start_paused(stateless_job, in_place):113    stateless_job.create()114    stateless_job.wait_for_state(goal_state="RUNNING")115    old_pod_infos = stateless_job.query_pods()116    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()117    update = StatelessUpdate(118        stateless_job,119        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,120        start_paused=True,121    )122    update.create(in_place=in_place)123    update.wait_for_state(goal_state="PAUSED")124    update.resume()125    update.wait_for_state(goal_state="SUCCEEDED")126    new_pod_infos = stateless_job.query_pods()127    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()128    assert len(old_pod_infos) == 3129    assert len(new_pod_infos) == 5130    assert_pod_id_changed(old_pod_infos, new_pod_infos)131    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)132def test__create_update_with_batch_size(stateless_job, in_place):133    stateless_job.create()134    stateless_job.wait_for_state(goal_state="RUNNING")135    old_pod_infos = stateless_job.query_pods()136    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()137    update = StatelessUpdate(138        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_SPEC, batch_size=1139    )140    update.create(in_place=in_place)141    update.wait_for_state(goal_state="SUCCEEDED")142    new_pod_infos = stateless_job.query_pods()143    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()144    assert_pod_id_changed(old_pod_infos, new_pod_infos)145    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)146def test__create_update_add_instances_with_batch_size(stateless_job, in_place):147    stateless_job.create()148    stateless_job.wait_for_state(goal_state="RUNNING")149    old_pod_infos = stateless_job.query_pods()150    update = StatelessUpdate(151        stateless_job,152        updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC,153        batch_size=1,154    )155    update.create(in_place=in_place)156    update.wait_for_state(goal_state="SUCCEEDED")157    new_pod_infos = stateless_job.query_pods()158    assert len(old_pod_infos) == 3159    assert len(new_pod_infos) == 5160def test__create_update_update_and_add_instances_with_batch(stateless_job, in_place):161    stateless_job.create()162    stateless_job.wait_for_state(goal_state="RUNNING")163    old_pod_infos = stateless_job.query_pods()164    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()165    update = StatelessUpdate(166        stateless_job,167        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,168        batch_size=1,169    )170    update.create(in_place=in_place)171    update.wait_for_state(goal_state="SUCCEEDED")172    new_pod_infos = stateless_job.query_pods()173    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()174    assert len(old_pod_infos) == 3175    assert len(new_pod_infos) == 5176    assert_pod_id_changed(old_pod_infos, new_pod_infos)177    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)178def test__create_update_bad_version(stateless_job, in_place):179    stateless_job.create()180    stateless_job.wait_for_state(goal_state="RUNNING")181    update = StatelessUpdate(182        stateless_job,183        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,184        batch_size=1,185    )186    try:187        update.create(entity_version="1-2-3", in_place=in_place)188    except grpc.RpcError as e:189        assert e.code() == grpc.StatusCode.ABORTED190        assert INVALID_ENTITY_VERSION_ERR_MESSAGE in e.details()191        return192    raise Exception("entity version mismatch error not received")193def test__pause_update_bad_version(stateless_job, in_place):194    stateless_job.create()195    stateless_job.wait_for_state(goal_state="RUNNING")196    update = StatelessUpdate(197        stateless_job,198        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,199        batch_size=1,200    )201    update.create(in_place=in_place)202    try:203        update.pause(entity_version="1-2-3")204    except grpc.RpcError as e:205        assert e.code() == grpc.StatusCode.ABORTED206        assert INVALID_ENTITY_VERSION_ERR_MESSAGE in e.details()207        return208    raise Exception("entity version mismatch error not received")209def test__resume_update_bad_version(stateless_job, in_place):210    stateless_job.create()211    stateless_job.wait_for_state(goal_state="RUNNING")212    update = StatelessUpdate(213        stateless_job,214        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,215        start_paused=True,216        batch_size=1,217    )218    update.create(in_place=in_place)219    try:220        update.resume(entity_version="1-2-3")221    except grpc.RpcError as e:222        assert e.code() == grpc.StatusCode.ABORTED223        assert INVALID_ENTITY_VERSION_ERR_MESSAGE in e.details()224        return225    raise Exception("entity version mismatch error not received")226def test__abort_update_bad_version(stateless_job, in_place):227    stateless_job.create()228    stateless_job.wait_for_state(goal_state="RUNNING")229    update = StatelessUpdate(230        stateless_job,231        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,232        batch_size=1,233    )234    update.create(in_place=in_place)235    try:236        update.abort(entity_version="1-2-3")237    except grpc.RpcError as e:238        assert e.code() == grpc.StatusCode.ABORTED239        assert INVALID_ENTITY_VERSION_ERR_MESSAGE in e.details()240        return241    raise Exception("entity version mismatch error not received")242def test__create_update_stopped_job(stateless_job, in_place):243    stateless_job.create()244    stateless_job.wait_for_state(goal_state="RUNNING")245    old_pod_infos = stateless_job.query_pods()246    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()247    old_pod_states = set()248    for pod_info in old_pod_infos:249        old_pod_states.add(pod_info.spec.pod_name.value)250    stateless_job.stop()251    stateless_job.wait_for_state(goal_state="KILLED")252    update = StatelessUpdate(253        stateless_job,254        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,255        batch_size=1,256    )257    update.create(in_place=in_place)258    stateless_job.start()259    update.wait_for_state(goal_state="SUCCEEDED")260    stateless_job.wait_for_state(goal_state="RUNNING")261    new_pod_infos = stateless_job.query_pods()262    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()263    assert len(old_pod_infos) == 3264    assert len(new_pod_infos) == 5265    assert_pod_id_changed(old_pod_infos, new_pod_infos)266    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)267    # Only new instances should be RUNNING268    for pod_info in new_pod_infos:269        if pod_info.spec.pod_name.value in new_pod_infos:270            assert pod_info.status.state == pod_pb2.POD_STATE_KILLED271        else:272            assert pod_info.status.state == pod_pb2.POD_STATE_RUNNING273def test__create_update_stopped_tasks(stateless_job, in_place):274    stateless_job.create()275    stateless_job.wait_for_state(goal_state="RUNNING")276    old_pod_infos = stateless_job.query_pods()277    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()278    stateless_job.stop()279    update = StatelessUpdate(280        stateless_job,281        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,282        batch_size=1,283    )284    update.create(in_place=in_place)285    update.wait_for_state(goal_state="SUCCEEDED")286    stateless_job.wait_for_state(goal_state="KILLED")287    stateless_job.start()288    stateless_job.wait_for_state(goal_state="RUNNING")289    new_pod_infos = stateless_job.query_pods()290    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()291    assert len(old_pod_infos) == 3292    assert len(new_pod_infos) == 5293    assert_pod_id_changed(old_pod_infos, new_pod_infos)294    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)295def test__create_multiple_consecutive_updates(stateless_job, in_place):296    stateless_job.create()297    stateless_job.wait_for_state(goal_state="RUNNING")298    old_pod_infos = stateless_job.query_pods()299    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()300    update1 = StatelessUpdate(301        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC302    )303    update1.create(in_place=in_place)304    update2 = StatelessUpdate(305        stateless_job,306        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,307        batch_size=1,308    )309    update2.create(in_place=in_place)310    update2.wait_for_state(goal_state="SUCCEEDED")311    new_pod_infos = stateless_job.query_pods()312    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()313    assert len(old_pod_infos) == 3314    assert len(new_pod_infos) == 5315    assert_pod_id_changed(old_pod_infos, new_pod_infos)316    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)317def test__abort_update(stateless_job, in_place):318    stateless_job.create()319    stateless_job.wait_for_state(goal_state="RUNNING")320    update = StatelessUpdate(321        stateless_job,322        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,323        batch_size=1,324    )325    update.create(in_place=in_place)326    update.wait_for_state(goal_state="ROLLING_FORWARD")327    update.abort()328    update.wait_for_state(goal_state="ABORTED")329def test__update_reduce_instances(stateless_job, in_place):330    stateless_job.create()331    stateless_job.wait_for_state(goal_state="RUNNING")332    old_pod_infos = stateless_job.query_pods()333    assert len(old_pod_infos) == 3334    # first increase instances335    update = StatelessUpdate(336        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC337    )338    update.create()339    update.wait_for_state(goal_state="SUCCEEDED")340    new_pod_infos = stateless_job.query_pods()341    assert len(new_pod_infos) == 5342    # now reduce instances343    update = StatelessUpdate(344        stateless_job,345        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_REDUCE_INSTANCES_SPEC,346    )347    update.create(in_place=in_place)348    update.wait_for_state(goal_state="SUCCEEDED")349    new_pod_infos = stateless_job.query_pods()350    assert len(new_pod_infos) == 3351    # now increase back again352    update = StatelessUpdate(353        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC354    )355    update.create()356    update.wait_for_state(goal_state="SUCCEEDED")357    new_pod_infos = stateless_job.query_pods()358    assert len(new_pod_infos) == 5359def test__update_reduce_instances_stopped_tasks(stateless_job, in_place):360    stateless_job.create()361    stateless_job.wait_for_state(goal_state="RUNNING")362    old_pod_infos = stateless_job.query_pods()363    assert len(old_pod_infos) == 3364    # first increase instances365    update = StatelessUpdate(366        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC367    )368    update.create(in_place=in_place)369    update.wait_for_state(goal_state="SUCCEEDED")370    new_pod_infos = stateless_job.query_pods()371    assert len(new_pod_infos) == 5372    # now stop last 2 tasks373    ranges = task_pb2.InstanceRange(to=5)374    setattr(ranges, "from", 3)375    stateless_job.stop(ranges=[ranges])376    # now reduce instance count377    update = StatelessUpdate(378        stateless_job,379        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_REDUCE_INSTANCES_SPEC,380    )381    update.create(in_place=in_place)382    update.wait_for_state(goal_state="SUCCEEDED")383    new_pod_infos = stateless_job.query_pods()384    assert len(new_pod_infos) == 3385# test__create_update_bad_config tests creating an update with bad config386# without rollback387def test__create_update_with_bad_config(stateless_job, in_place):388    stateless_job.create()389    stateless_job.wait_for_state(goal_state="RUNNING")390    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()391    update = StatelessUpdate(392        stateless_job,393        updated_job_file=UPDATE_STATELESS_JOB_BAD_SPEC,394        max_failure_instances=3,395        max_instance_attempts=1,396    )397    update.create(in_place=in_place)398    update.wait_for_state(goal_state="FAILED", failed_state="SUCCEEDED")399    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()400    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)401    for pod_info in stateless_job.query_pods():402        assert pod_info.status.state == pod_pb2.POD_STATE_FAILED403# test__create_update_add_instances_with_bad_config404# tests creating an update with bad config and more instances405# without rollback406def test__create_update_add_instances_with_bad_config(stateless_job, in_place):407    stateless_job.create()408    stateless_job.wait_for_state(goal_state="RUNNING")409    job_spec_dump = load_test_config(UPDATE_STATELESS_JOB_BAD_SPEC)410    updated_job_spec = JobSpec()411    json_format.ParseDict(job_spec_dump, updated_job_spec)412    updated_job_spec.instance_count = stateless_job.job_spec.instance_count + 3413    update = StatelessUpdate(414        stateless_job,415        batch_size=1,416        updated_job_spec=updated_job_spec,417        max_failure_instances=1,418        max_instance_attempts=1,419    )420    update.create(in_place=in_place)421    update.wait_for_state(goal_state="FAILED", failed_state="SUCCEEDED")422    # only one instance should be added423    assert (424        len(stateless_job.query_pods())425        == stateless_job.job_spec.instance_count + 1426    )427# test__create_update_reduce_instances_with_bad_config428# tests creating an update with bad config and fewer instances429# without rollback430def test__create_update_reduce_instances_with_bad_config(stateless_job, in_place):431    stateless_job.create()432    stateless_job.wait_for_state(goal_state="RUNNING")433    old_pod_infos = stateless_job.query_pods()434    job_spec_dump = load_test_config(UPDATE_STATELESS_JOB_BAD_SPEC)435    updated_job_spec = JobSpec()436    json_format.ParseDict(job_spec_dump, updated_job_spec)437    updated_job_spec.instance_count = stateless_job.job_spec.instance_count - 1438    update = StatelessUpdate(439        stateless_job,440        updated_job_spec=updated_job_spec,441        batch_size=1,442        max_failure_instances=1,443        max_instance_attempts=1,444    )445    update.create(in_place=in_place)446    update.wait_for_state(goal_state="FAILED", failed_state="SUCCEEDED")447    new_pod_infos = stateless_job.query_pods()448    assert len(old_pod_infos) == len(new_pod_infos)449# test__create_update_with_failed_health_check450# tests an update fails even if the new task state is RUNNING,451# as long as the health check fails452def test__create_update_with_failed_health_check(stateless_job, in_place):453    stateless_job.create()454    stateless_job.wait_for_state(goal_state="RUNNING")455    update = StatelessUpdate(456        stateless_job,457        updated_job_file=UPDATE_STATELESS_JOB_BAD_HEALTH_CHECK_SPEC,458        max_failure_instances=1,459        max_instance_attempts=1,460    )461    update.create(in_place=in_place)462    update.wait_for_state(goal_state="FAILED", failed_state="SUCCEEDED")463# test__create_update_to_disable_health_check tests an update which464# disables healthCheck465def test__create_update_to_disable_health_check(in_place):466    job = StatelessJob(467        job_file=UPDATE_STATELESS_JOB_WITH_HEALTH_CHECK_SPEC,468        config=IntegrationTestConfig(469            max_retry_attempts=100,470            pool_file='test_stateless_respool.yaml',471        ),472    )473    job.create()474    job.wait_for_state(goal_state="RUNNING")475    job.job_spec.default_spec.containers[0].liveness_check.enabled = False476    update = StatelessUpdate(477        job,478        updated_job_spec=job.job_spec,479        max_failure_instances=1,480        max_instance_attempts=1,481    )482    update.create(in_place=in_place)483    update.wait_for_state(goal_state="SUCCEEDED")484# test__create_update_to_enable_health_check tests an update which485# enables healthCheck486def test__create_update_to_enable_health_check(in_place):487    job = StatelessJob(488        job_file=UPDATE_STATELESS_JOB_WITH_HEALTH_CHECK_SPEC,489        config=IntegrationTestConfig(490            max_retry_attempts=100,491            pool_file='test_stateless_respool.yaml',492        ),493    )494    job.job_spec.default_spec.containers[0].liveness_check.enabled = False495    job.create()496    job.wait_for_state(goal_state="RUNNING")497    job.job_spec.default_spec.containers[0].liveness_check.enabled = True498    update = StatelessUpdate(499        job,500        updated_job_spec=job.job_spec,501        max_failure_instances=1,502        max_instance_attempts=1,503    )504    update.create(in_place=in_place)505    update.wait_for_state(goal_state="SUCCEEDED")506# test__create_update_to_unset_health_check tests an update to unset507# health check config508def test__create_update_to_unset_health_check(in_place):509    job = StatelessJob(510        job_file=UPDATE_STATELESS_JOB_WITH_HEALTH_CHECK_SPEC,511        config=IntegrationTestConfig(512            max_retry_attempts=100,513            pool_file='test_stateless_respool.yaml',514        ),515    )516    job.create()517    job.wait_for_state(goal_state="RUNNING")518    update = StatelessUpdate(519        job,520        updated_job_file=UPDATE_STATELESS_JOB_SPEC,521        max_failure_instances=1,522        max_instance_attempts=1,523    )524    update.create(in_place=in_place)525    update.wait_for_state(goal_state="SUCCEEDED")526# test__create_update_to_unset_health_check tests an update to set527# health check config for a job without health check set528def test__create_update_to_set_health_check(in_place):529    job = StatelessJob(530        job_file=UPDATE_STATELESS_JOB_SPEC,531        config=IntegrationTestConfig(532            max_retry_attempts=100,533            pool_file='test_stateless_respool.yaml',534        ),535    )536    job.create()537    job.wait_for_state(goal_state="RUNNING")538    update = StatelessUpdate(539        job,540        updated_job_file=UPDATE_STATELESS_JOB_WITH_HEALTH_CHECK_SPEC,541        max_failure_instances=1,542        max_instance_attempts=1,543    )544    update.create(in_place=in_place)545    update.wait_for_state(goal_state="SUCCEEDED")546# test__create_update_to_change_health_check_config tests an update which547# changes healthCheck548def test__create_update_to_change_health_check_config(in_place):549    job = StatelessJob(550        job_file=UPDATE_STATELESS_JOB_WITH_HEALTH_CHECK_SPEC,551        config=IntegrationTestConfig(552            max_retry_attempts=100,553            pool_file='test_stateless_respool.yaml',554        ),555    )556    job.job_spec.default_spec.containers[0].liveness_check.enabled = False557    job.create()558    job.wait_for_state(goal_state="RUNNING")559    job.job_spec.default_spec.containers[560        0561    ].liveness_check.initial_interval_secs = 2562    update = StatelessUpdate(563        job,564        updated_job_spec=job.job_spec,565        max_failure_instances=1,566        max_instance_attempts=1,567    )568    update.create(in_place=in_place)569    update.wait_for_state(goal_state="SUCCEEDED")570# test__auto_rollback_update_with_bad_config tests creating an update with bad config571# with rollback572def test__auto_rollback_update_with_bad_config(stateless_job, in_place):573    stateless_job.create()574    stateless_job.wait_for_state(goal_state="RUNNING")575    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()576    update = StatelessUpdate(577        stateless_job,578        updated_job_file=UPDATE_STATELESS_JOB_BAD_SPEC,579        roll_back_on_failure=True,580        max_failure_instances=1,581        max_instance_attempts=1,582    )583    update.create(in_place=in_place)584    update.wait_for_state(goal_state="ROLLED_BACK")585    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()586    assert_pod_spec_equal(old_instance_zero_spec, new_instance_zero_spec)587# test__auto_rollback_update_add_instances_with_bad_config588# tests creating an update with bad config and more instances589# with rollback590def test__auto_rollback_update_add_instances_with_bad_config(stateless_job, in_place):591    stateless_job.create()592    stateless_job.wait_for_state(goal_state="RUNNING")593    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()594    job_spec_dump = load_test_config(UPDATE_STATELESS_JOB_BAD_SPEC)595    updated_job_spec = JobSpec()596    json_format.ParseDict(job_spec_dump, updated_job_spec)597    updated_job_spec.instance_count = stateless_job.job_spec.instance_count + 3598    update = StatelessUpdate(599        stateless_job,600        updated_job_spec=updated_job_spec,601        roll_back_on_failure=True,602        max_failure_instances=1,603        max_instance_attempts=1,604    )605    update.create(in_place=in_place)606    update.wait_for_state(goal_state="ROLLED_BACK")607    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()608    # no instance should be added609    assert (610        len(stateless_job.query_pods())611        == stateless_job.job_spec.instance_count612    )613    assert_pod_spec_equal(old_instance_zero_spec, new_instance_zero_spec)614# test__auto_rollback_update_reduce_instances_with_bad_config615# tests creating an update with bad config and fewer instances616# with rollback617def test__auto_rollback_update_reduce_instances_with_bad_config(stateless_job, in_place):618    stateless_job.create()619    stateless_job.wait_for_state(goal_state="RUNNING")620    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()621    job_spec_dump = load_test_config(UPDATE_STATELESS_JOB_BAD_SPEC)622    updated_job_spec = JobSpec()623    json_format.ParseDict(job_spec_dump, updated_job_spec)624    updated_job_spec.instance_count = stateless_job.job_spec.instance_count - 1625    update = StatelessUpdate(626        stateless_job,627        updated_job_spec=updated_job_spec,628        roll_back_on_failure=True,629        max_failure_instances=1,630        max_instance_attempts=1,631    )632    update.create(in_place=in_place)633    update.wait_for_state(goal_state="ROLLED_BACK")634    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()635    # no instance should be removed636    assert (637        len(stateless_job.query_pods())638        == stateless_job.job_spec.instance_count639    )640    assert_pod_spec_equal(old_instance_zero_spec, new_instance_zero_spec)641# test__auto_rollback_update_with_failed_health_check642# tests an update fails even if the new task state is RUNNING,643# as long as the health check fails644def test__auto_rollback_update_with_failed_health_check(stateless_job, in_place):645    stateless_job.create()646    stateless_job.wait_for_state(goal_state="RUNNING")647    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()648    update = StatelessUpdate(649        stateless_job,650        updated_job_file=UPDATE_STATELESS_JOB_BAD_HEALTH_CHECK_SPEC,651        roll_back_on_failure=True,652        max_failure_instances=1,653        max_instance_attempts=1,654    )655    update.create(in_place=in_place)656    update.wait_for_state(goal_state="ROLLED_BACK")657    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()658    assert_pod_spec_equal(old_instance_zero_spec, new_instance_zero_spec)659# test__pause_resume_initialized_update test pause and resume660#  an update in initialized state661def test__pause_resume_initialized_update(stateless_job, in_place):662    stateless_job.create()663    stateless_job.wait_for_state(goal_state="RUNNING")664    old_pod_infos = stateless_job.query_pods()665    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()666    update = StatelessUpdate(667        stateless_job, batch_size=1, updated_job_file=UPDATE_STATELESS_JOB_SPEC668    )669    update.create(in_place=in_place)670    # immediately pause the update, so the update may still be INITIALIZED671    update.pause()672    update.wait_for_state(goal_state="PAUSED")673    update.resume()674    update.wait_for_state(goal_state="SUCCEEDED")675    new_pod_infos = stateless_job.query_pods()676    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()677    assert_pod_id_changed(old_pod_infos, new_pod_infos)678    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)679# test__pause_resume_initialized_update test pause and resume an update680def test__pause_resume__update(stateless_job, in_place):681    stateless_job.create()682    stateless_job.wait_for_state(goal_state="RUNNING")683    old_pod_infos = stateless_job.query_pods()684    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()685    update = StatelessUpdate(686        stateless_job, batch_size=1, updated_job_file=UPDATE_STATELESS_JOB_SPEC687    )688    update.create(in_place=in_place)689    # sleep for 1 sec so update can begin to roll forward690    time.sleep(1)691    update.pause()692    update.wait_for_state(goal_state="PAUSED")693    update.resume()694    update.wait_for_state(goal_state="SUCCEEDED")695    new_pod_infos = stateless_job.query_pods()696    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()697    assert_pod_id_changed(old_pod_infos, new_pod_infos)698    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)699# test_manual_rollback manually rolls back a running update when700# the instance count is reduced in the rollback.701# Note that manual rollback in peloton is just updating to the702# previous job configuration703def test_manual_rollback_reduce_instances(stateless_job, in_place):704    stateless_job.create()705    stateless_job.wait_for_state(goal_state="RUNNING")706    old_pod_infos = stateless_job.query_pods()707    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()708    update = StatelessUpdate(709        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC710    )711    update.create(in_place=in_place)712    # manually rollback the update713    update2 = StatelessUpdate(714        stateless_job,715        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_REDUCE_INSTANCES_SPEC,716    )717    update2.create(in_place=in_place)718    update2.wait_for_state(goal_state="SUCCEEDED")719    new_pod_infos = stateless_job.query_pods()720    assert len(old_pod_infos) == len(new_pod_infos)721    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()722    assert_pod_spec_equal(old_instance_zero_spec, new_instance_zero_spec)723# test_manual_rollback manually rolls back a running update when724# the instance count is increased in the rollback725def test_manual_rollback_increase_instances(stateless_job, in_place):726    stateless_job.create()727    stateless_job.wait_for_state(goal_state="RUNNING")728    update = StatelessUpdate(729        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC730    )731    update.create(in_place=in_place)732    update.wait_for_state(goal_state="SUCCEEDED")733    old_pod_infos = stateless_job.query_pods()734    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()735    # reduce instance count and then roll it back736    update2 = StatelessUpdate(737        stateless_job,738        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_REDUCE_INSTANCES_SPEC,739    )740    update2.create(in_place=in_place)741    update3 = StatelessUpdate(742        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC743    )744    update3.create(in_place=in_place)745    update3.wait_for_state(goal_state="SUCCEEDED")746    new_pod_infos = stateless_job.query_pods()747    assert len(old_pod_infos) == len(new_pod_infos)748    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()749    assert_pod_spec_equal(old_instance_zero_spec, new_instance_zero_spec)750# test_auto_rollback_reduce_instances751#  rolls back a failed update when752# the instance count is reduced in the rollback.753def test_auto_rollback_reduce_instances(stateless_job, in_place):754    stateless_job.create()755    stateless_job.wait_for_state(goal_state="RUNNING")756    job_spec_dump = load_test_config(757        UPDATE_STATELESS_JOB_BAD_HEALTH_CHECK_SPEC758    )759    updated_job_spec = JobSpec()760    json_format.ParseDict(job_spec_dump, updated_job_spec)761    # increase the instance count762    updated_job_spec.instance_count = stateless_job.job_spec.instance_count + 3763    update = StatelessUpdate(764        stateless_job,765        updated_job_spec=updated_job_spec,766        roll_back_on_failure=True,767        max_instance_attempts=1,768        max_failure_instances=1,769        batch_size=1,770    )771    update.create(in_place=in_place)772    update.wait_for_state(goal_state="ROLLED_BACK")773    assert (774        len(stateless_job.query_pods())775        == stateless_job.job_spec.instance_count776    )777# test_update_create_failure_invalid_spec tests the778# update create failure due to invalid spec in request779def test_update_create_failure_invalid_spec(stateless_job, in_place):780    stateless_job.create()781    stateless_job.wait_for_state(goal_state="RUNNING")782    update = StatelessUpdate(783        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_INVALID_SPEC784    )785    try:786        update.create(in_place=in_place)787    except grpc.RpcError as e:788        assert e.code() == grpc.StatusCode.INVALID_ARGUMENT789        return790    raise Exception("job spec validation error not received")791# test_update_killed_job tests updating a killed job.792# The job should be updated but still remain in killed state793def test_update_killed_job(in_place):794    job = StatelessJob(job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC)795    job.create()796    job.wait_for_state(goal_state="RUNNING")797    job.stop()798    job.wait_for_state(goal_state="KILLED")799    update = StatelessUpdate(800        job, updated_job_file=UPDATE_STATELESS_JOB_UPDATE_REDUCE_INSTANCES_SPEC801    )802    update.create(in_place=in_place)803    update.wait_for_state(goal_state="SUCCEEDED")804    assert job.get_spec().instance_count == 3805    assert job.get_status().state == stateless_pb2.JOB_STATE_KILLED806# test_start_job_with_active_update tests807# starting a job with an active update808def test_start_job_with_active_update(stateless_job, in_place):809    stateless_job.create()810    stateless_job.wait_for_state(goal_state="RUNNING")811    assert len(stateless_job.query_pods()) == 3812    stateless_job.stop()813    update = StatelessUpdate(814        stateless_job,815        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,816        batch_size=1,817    )818    update.create(in_place=in_place)819    stateless_job.start()820    update.wait_for_state(goal_state="SUCCEEDED")821    stateless_job.wait_for_all_pods_running()822    assert len(stateless_job.query_pods()) == 5823# test_stop_running_job_with_active_update_add_instances tests824# stopping a running job with an active update(add instances)825def test_stop_running_job_with_active_update_add_instances(stateless_job, in_place):826    stateless_job.create()827    stateless_job.wait_for_state(goal_state="RUNNING")828    assert len(stateless_job.query_pods()) == 3829    update = StatelessUpdate(830        stateless_job,831        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,832        batch_size=1,833    )834    update.create(in_place=in_place)835    update.wait_for_state(goal_state="ROLLING_FORWARD")836    stateless_job.stop()837    update.wait_for_state(goal_state="SUCCEEDED")838    assert stateless_job.get_spec().instance_count == 5839# test_stop_running_job_with_active_update_remove_instances tests840# stopping a running job with an active update(remove instances)841def test_stop_running_job_with_active_update_remove_instances(in_place):842    stateless_job = StatelessJob(843        job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC844    )845    stateless_job.create()846    stateless_job.wait_for_state(goal_state="RUNNING")847    assert len(stateless_job.query_pods()) == 5848    update = StatelessUpdate(849        stateless_job,850        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_REDUCE_INSTANCES_SPEC,851        batch_size=1,852    )853    update.create(in_place=in_place)854    update.wait_for_state(goal_state="ROLLING_FORWARD")855    stateless_job.stop()856    update.wait_for_state(goal_state="SUCCEEDED")857    assert stateless_job.get_spec().instance_count == 3858# test_stop_running_job_with_active_update_same_instance_count tests stopping859# a running job with an active update that doesn't change instance count860def test_stop_running_job_with_active_update_same_instance_count(861    stateless_job,862    in_place863):864    stateless_job.create()865    stateless_job.wait_for_state(goal_state="RUNNING")866    stateless_job.job_spec.default_spec.containers[867        0868    ].command.value = "sleep 100"869    update = StatelessUpdate(870        stateless_job,871        updated_job_spec=stateless_job.job_spec,872        max_failure_instances=1,873        max_instance_attempts=1,874    )875    update.create(in_place=in_place)876    stateless_job.stop()877    update.wait_for_state(goal_state="SUCCEEDED")878    assert stateless_job.get_spec().instance_count == 3879    assert (880        stateless_job.get_spec().default_spec.containers[0].command.value881        == "sleep 100"882    )883# test__create_update_before_job_fully_created creates an update884# right after a job is created. It tests the case that job can be885# updated before it is fully created886def test__create_update_before_job_fully_created(stateless_job, in_place):887    stateless_job.create()888    update = StatelessUpdate(889        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_SPEC890    )891    update.create(in_place=in_place)892    update.wait_for_state(goal_state="SUCCEEDED")893    assert (894        stateless_job.get_spec().default_spec.containers[0].command.value895        == "while :; do echo updated; sleep 10; done"896    )897# test__in_place_update_success_rate tests that in-place update898# should succeed when every daemon is in healthy state.899# It starts a job with 30 instances, and start the in-place update900# without batch size, then it tests if any pod is running on unexpected901# host.902# TODO: Re-enable k8s when it stops being flaky.903# @pytest.mark.k8s904def test__in_place_update_success_rate(stateless_job):905    stateless_job.job_spec.instance_count = 30906    stateless_job.create()907    stateless_job.wait_for_all_pods_running()908    old_pod_infos = stateless_job.query_pods()909    job_spec_dump = load_test_config(update_stateless_job_spec())910    updated_job_spec = JobSpec()911    json_format.ParseDict(job_spec_dump, updated_job_spec)912    updated_job_spec.instance_count = 30913    if minicluster_type() == "k8s":914        updated_job_spec.default_spec.containers[0].resource.mem_limit_mb = 0.1915    update = StatelessUpdate(stateless_job,916                             updated_job_spec=updated_job_spec,917                             batch_size=0)918    update.create(in_place=True)919    update.wait_for_state(goal_state='SUCCEEDED')920    new_pod_infos = stateless_job.query_pods()921    old_pod_dict = {}922    new_pod_dict = {}923    for old_pod_info in old_pod_infos:924        split_index = old_pod_info.status.pod_id.value.rfind('-')925        pod_name = old_pod_info.status.pod_id.value[:split_index]926        old_pod_dict[pod_name] = old_pod_info.status.host927    for new_pod_info in new_pod_infos:928        split_index = new_pod_info.status.pod_id.value.rfind('-')929        pod_name = new_pod_info.status.pod_id.value[:split_index]930        new_pod_dict[pod_name] = new_pod_info.status.host931    count = 0932    for pod_name, pod_id in old_pod_dict.items():933        if new_pod_dict[pod_name] != old_pod_dict[pod_name]:934            log.info("%s, prev:%s, cur:%s", pod_name,935                     old_pod_dict[pod_name], new_pod_dict[pod_name])936            count = count + 1937    log.info("total mismatch: %d", count)938    assert count == 0939# test__in_place_kill_job_release_host tests the case of killing940# an ongoing in-place update would release hosts, so the second941# update can get completed942def test__in_place_kill_job_release_host():943    job1 = StatelessJob(944        job_file="test_stateless_job_spec.yaml",945    )946    job1.create()947    job1.wait_for_state(goal_state="RUNNING")948    job2 = StatelessJob(949        job_file="test_stateless_job_spec.yaml",950    )951    job2.create()952    job2.wait_for_state(goal_state="RUNNING")953    update1 = StatelessUpdate(job1,954                              updated_job_file=UPDATE_STATELESS_JOB_SPEC,955                              batch_size=0)956    update1.create(in_place=True)957    # stop the update958    job1.stop()959    update2 = StatelessUpdate(job2,960                              updated_job_file=UPDATE_STATELESS_JOB_SPEC,961                              batch_size=0)962    update2.create()963    # both updates should complete964    update1.wait_for_state(goal_state="SUCCEEDED")965    update2.wait_for_state(goal_state="SUCCEEDED")966@pytest.mark.skip(reason="flaky test")967def test__in_place_update_host_maintenance(stateless_job, maintenance):968    # add enough instances so each host should have some tasks running969    stateless_job.job_spec.instance_count = 9970    # need extra retry attempts, since in-place update would need more time971    # to process given agent is put in maintenance mode972    stateless_job.config = IntegrationTestConfig(973        max_retry_attempts=300,974        pool_file='test_stateless_respool.yaml',975    ),976    stateless_job.create()977    stateless_job.wait_for_all_pods_running()978    job_spec_dump = load_test_config(UPDATE_STATELESS_JOB_SPEC)979    updated_job_spec = JobSpec()980    json_format.ParseDict(job_spec_dump, updated_job_spec)981    updated_job_spec.instance_count = 9982    update = StatelessUpdate(stateless_job,983                             updated_job_spec=updated_job_spec,984                             batch_size=0)985    update.create(in_place=True)986    # Pick a host that is UP and start maintenance on it987    test_host = get_host_in_state(host_pb2.HOST_STATE_UP)988    resp = maintenance["start"]([test_host])989    assert resp990    wait_for_host_state(test_host, host_pb2.HOST_STATE_DOWN)991    update.wait_for_state(goal_state="SUCCEEDED")992def test__update_with_sla_aware_host_maintenance(stateless_job, maintenance):993    """994    1. Create a stateless job with 3 instances.995    2. Create a job update to update the instance job with instance count 2,996    add host-limit-1 constraint and define sla with maximum_unavailable_instances=1997    3. Start host maintenance on one of the hosts998    4. The host should transition to DOWN and the update workflow should SUCCEED999    """1000    stateless_job.create()1001    stateless_job.wait_for_all_pods_running()1002    job_spec_dump = load_test_config('test_stateless_job_spec_sla.yaml')1003    updated_job_spec = JobSpec()1004    json_format.ParseDict(job_spec_dump, updated_job_spec)1005    updated_job_spec.instance_count = 21006    update = StatelessUpdate(stateless_job,1007                             updated_job_spec=updated_job_spec,1008                             batch_size=1)1009    update.create()1010    # Pick a host that is UP and start maintenance on it1011    test_host = get_host_in_state(host_pb2.HOST_STATE_UP)1012    resp = maintenance["start"]([test_host])1013    assert resp1014    update.wait_for_state(goal_state="SUCCEEDED")1015    wait_for_host_state(test_host, host_pb2.HOST_STATE_DOWN)1016def test__update_with_host_maintenance_and_agent_down(stateless_job, maintenance):1017    """1018    1. Create a large stateless job (that take up more than two-thirds of1019       the cluster resources) with MaximumUnavailableInstances=2.1020    2. Start host maintenance on one of the hosts (say A) having pods of the job.1021       MaximumUnavailableInstances=2 ensures that not more than 2 pods are1022       unavailable due to host maintenance at a time.1023    3. Take down another host which has pods running on it. This will TASK_LOST1024       to be sent for all pods on the host after 75 seconds.1025    4. Start an update to modify the instance spec of one of the pods.1026    5. Since TASK_LOST would cause the job SLA to be violated, instances on the1027       host A should not be killed once LOST event is received. Verify that1028       host A does not transition to DOWN.1029    """1030    stateless_job.job_spec.instance_count = 301031    stateless_job.job_spec.default_spec.containers[0].resource.cpu_limit = 0.31032    stateless_job.job_spec.sla.maximum_unavailable_instances = 21033    stateless_job.create()1034    stateless_job.wait_for_all_pods_running()1035    hosts = [h.hostname for h in query_hosts([]).host_infos]1036    host_to_task_count = get_host_to_task_count(hosts, stateless_job)1037    sorted_hosts = [t[0] for t in sorted(1038        host_to_task_count.items(), key=operator.itemgetter(1), reverse=True)]1039    # Pick a host that has pods running on it to start maintenance on it.1040    test_host = sorted_hosts[0]1041    # pick another host which has pods of the job to take down1042    host_container = get_container([sorted_hosts[1]])1043    try:1044        host_container.stop()1045        maintenance["start"]([test_host])1046        stateless_job.job_spec.instance_spec[10].containers.extend(1047            [pod_pb2.ContainerSpec(resource=pod_pb2.ResourceSpec(disk_limit_mb=20))])1048        update = StatelessUpdate(stateless_job,1049                                 updated_job_spec=stateless_job.job_spec,1050                                 batch_size=0)1051        update.create()1052        update.wait_for_state(goal_state="SUCCEEDED")1053        stateless_job.stop()1054        wait_for_host_state(test_host, host_pb2.HOST_STATE_DOWN)1055        assert False, 'Host should not transition to DOWN'1056    except:1057        assert is_host_in_state(test_host, host_pb2.HOST_STATE_DRAINING)1058        pass1059    finally:1060        host_container.start()1061def test__update_with_host_maintenance__bad_config(stateless_job, maintenance):1062    """1063    1. Create a stateless job with 6 instances. Wait for all instances to reach1064       RUNNING state. This means that there is at least one host with 2 or more1065       instances on it1066    2. Start a bad job update with max failure tolerance of 1 and auto-rollback1067       disabled.1068    3. Start host maintenance on one of the hosts (say host A).1069    4. Wait for the update to fail. There should be 2 instances unavailable.1070    5. Since 2 instances are already unavailable and1071       maximum_unavailable_instances=1, host maintenance should not proceed.1072       Verify that the host A doesn't transition to DOWN.1073    """1074    stateless_job.job_spec.sla.maximum_unavailable_instances = 11075    stateless_job.job_spec.instance_count = 61076    stateless_job.create()1077    stateless_job.wait_for_all_pods_running()1078    hosts = [h.hostname for h in query_hosts([]).host_infos]1079    host_to_task_count = get_host_to_task_count(hosts, stateless_job)1080    sorted_hosts = [t[0] for t in sorted(1081        host_to_task_count.items(), key=operator.itemgetter(1), reverse=True)]1082    job_spec_dump = load_test_config(UPDATE_STATELESS_JOB_BAD_SPEC)1083    updated_job_spec = JobSpec()1084    json_format.ParseDict(job_spec_dump, updated_job_spec)1085    updated_job_spec.instance_count = 61086    updated_job_spec.sla.maximum_unavailable_instances = 11087    update = StatelessUpdate(1088        stateless_job,1089        updated_job_spec=updated_job_spec,1090        max_failure_instances=1,1091        max_instance_attempts=1,1092        batch_size=2,1093    )1094    update.create()1095    # Pick a host that has pods running on it to start maintenance on it.1096    test_host = sorted_hosts[0]1097    maintenance["start"]([test_host])1098    update.wait_for_state(goal_state="FAILED", failed_state="SUCCEEDED")1099    try:1100        wait_for_host_state(test_host, host_pb2.HOST_STATE_DOWN)1101        assert False, 'Host should not transition to DOWN'1102    except:1103        assert is_host_in_state(test_host, host_pb2.HOST_STATE_DRAINING)1104# test__create_update_update_job_config tests update job level config1105# would not trigger task restart1106def test__create_update_update_job_config(stateless_job):1107    stateless_job.create()1108    stateless_job.wait_for_all_pods_running()1109    old_pod_infos = stateless_job.query_pods()1110    update = StatelessUpdate(1111        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_JOB_CONFIG_UPDATE_SPEC1112    )1113    update.create()1114    update.wait_for_state(goal_state="SUCCEEDED")1115    new_pod_infos = stateless_job.query_pods()...test_provisioner_service.py
Source:test_provisioner_service.py  
...137        launch_id = _new_id()138        node_ids = [_new_id()]139        client.provision(launch_id, node_ids, deployable_type,140            'fake-site1', caller="asterix")141        ok = notifier.wait_for_state(InstanceState.FAILED, node_ids)142        self.assertTrue(ok)143        self.assertTrue(notifier.assure_record_count(1))144        self.assertStoreNodeRecords(InstanceState.FAILED, *node_ids)145        self.assertStoreLaunchRecord(InstanceState.FAILED, launch_id)146    def test_provision_with_vars(self):147        client = self.client148        caller = 'asterix'149        deployable_type = 'empty-with-vars'150        launch_id = _new_id()151        node_ids = [_new_id()]152        vars = {'image_id': 'fake-image'}153        client.provision(launch_id, node_ids, deployable_type,154            'fake-site1', vars=vars, caller=caller)155        self.notifier.wait_for_state(InstanceState.PENDING, node_ids,156            before=self.provisioner.leader._force_cycle)157        self.assertStoreNodeRecords(InstanceState.PENDING, *node_ids)158    def test_provision_with_missing_vars(self):159        client = self.client160        notifier = self.notifier161        caller = 'asterix'162        deployable_type = 'empty-with-vars'163        launch_id = _new_id()164        node_ids = [_new_id()]165        vars = {'foo': 'bar'}166        client.provision(launch_id, node_ids, deployable_type,167            'fake-site1', vars=vars, caller=caller)168        ok = notifier.wait_for_state(InstanceState.FAILED, node_ids)169        self.assertTrue(ok)170        self.assertTrue(notifier.assure_record_count(1))171        self.assertStoreNodeRecords(InstanceState.FAILED, *node_ids)172        self.assertStoreLaunchRecord(InstanceState.FAILED, launch_id)173    def test_provision_broker_error(self):174        client = self.client175        notifier = self.notifier176        deployable_type = 'empty'177        launch_id = _new_id()178        self.context_client.create_error = BrokerError("fake failure")179        node_ids = [_new_id()]180        client.provision(launch_id, node_ids, deployable_type,181            'fake-site1', caller="asterix")182        ok = notifier.wait_for_state(InstanceState.FAILED, node_ids)183        self.assertTrue(ok)184        self.assertTrue(notifier.assure_record_count(1))185        self.assertStoreNodeRecords(InstanceState.FAILED, *node_ids)186        self.assertStoreLaunchRecord(InstanceState.FAILED, launch_id)187    def test_dump_state(self):188        running_launch, running_nodes = make_launch_and_nodes(_new_id(), 10, InstanceState.RUNNING)189        self.store.add_launch(running_launch)190        for node in running_nodes:191            self.store.add_node(node)192        pending_launch, pending_nodes = make_launch_and_nodes(_new_id(), 3, InstanceState.PENDING)193        self.store.add_launch(pending_launch)194        for node in pending_nodes:195            self.store.add_node(node)196        running_node_ids = [node['node_id'] for node in running_nodes]197        pending_node_ids = [node['node_id'] for node in pending_nodes]198        all_node_ids = running_node_ids + pending_node_ids199        self.client.dump_state(running_node_ids)200        ok = self.notifier.wait_for_state(InstanceState.RUNNING, nodes=running_node_ids)201        self.assertTrue(ok)202        self.assertEqual(len(self.notifier.nodes), len(running_nodes))203        self.client.dump_state(pending_node_ids)204        ok = self.notifier.wait_for_state(InstanceState.PENDING, nodes=pending_node_ids)205        self.assertTrue(ok)206        self.assertEqual(len(self.notifier.nodes), len(all_node_ids))207        # we should have not gotten any dupe records yet208        self.assertTrue(self.notifier.assure_record_count(1))209        # empty dump request should dump nothing210        self.client.dump_state([])211        self.assertTrue(self.notifier.assure_record_count(1))212    def test_terminate(self):213        node_ids = []214        for _ in range(10):215            node_id = _new_id()216            node_ids.append(node_id)217            self.client.provision(_new_id(), [node_id], "empty",218                site="fake-site1", caller="asterix")219        self.notifier.wait_for_state(InstanceState.PENDING, node_ids,220            before=self.provisioner.leader._force_cycle)221        for node_id in node_ids:222            node = self.store.get_node(node_id)223            self.driver.set_node_running(node['iaas_id'])224        self.notifier.wait_for_state(InstanceState.STARTED, node_ids,225            before=self.provisioner.leader._force_cycle)226        # terminate half of the nodes then the rest227        first_five = node_ids[:5]228        last_five = node_ids[5:]229        self.client.terminate_nodes(first_five, caller="asterix")230        ok = self.notifier.wait_for_state(InstanceState.TERMINATED, nodes=first_five)231        self.assertTrue(ok)232        self.client.terminate_nodes(last_five, caller="asterix")233        ok = self.notifier.wait_for_state(InstanceState.TERMINATED, nodes=last_five)234        self.assertTrue(ok)235        self.assertEqual(set(node_ids), set(self.notifier.nodes))236        # should be REQUESTED, PENDING, STARTED, TERMINATING and TERMINATED records for each node237        self.assertTrue(self.notifier.assure_record_count(5))238        self.assertEqual(len(self.driver.destroyed),239                         len(node_ids))240    def test_terminate_unknown(self):241        instance_id = _new_id()242        self.client.terminate_nodes([instance_id])243        ok = self.notifier.wait_for_state(InstanceState.TERMINATED, nodes=[instance_id])244        self.assertTrue(ok)245    def test_launch_allocation(self):246        node_id = _new_id()247        self.client.provision(_new_id(), [node_id], "empty",248            site="fake-site1", caller="asterix")249        self.notifier.wait_for_state(InstanceState.PENDING, [node_id],250            before=self.provisioner.leader._force_cycle)251        self.assertStoreNodeRecords(InstanceState.PENDING)252        self.assertEqual(len(self.driver.created), 1)253        libcloud_node = self.driver.created[0]254        self.assertEqual(libcloud_node.size.id, "m1.small")255    def test_launch_many_terminate_all(self):256        all_node_ids = []257        # after the terminate_all, provision requests should be REJECTED258        rejected_node_ids = []259        for _ in range(100):260            node_id = _new_id()261            all_node_ids.append(node_id)262            self.client.provision(_new_id(), [node_id], "empty",263                site="fake-site1", caller="asterix")264        self.notifier.wait_for_state(InstanceState.PENDING, all_node_ids,265            before=self.provisioner.leader._force_cycle)266        self.assertStoreNodeRecords(InstanceState.PENDING, *all_node_ids)267        for node_id in all_node_ids:268            node = self.store.get_node(node_id)269            self.driver.set_node_running(node['iaas_id'])270        self.notifier.wait_for_state(InstanceState.STARTED, all_node_ids,271            before=self.provisioner.leader._force_cycle)272        self.assertStoreNodeRecords(InstanceState.STARTED, *all_node_ids)273        log.debug("Expecting %d nodes to be terminated", len(all_node_ids))274        self.assertIs(self.client.terminate_all(), False)275        # future requests should be rejected276        for _ in range(5):277            node_id = _new_id()278            rejected_node_ids.append(node_id)279            self.client.provision(_new_id(), [node_id], "empty",280                site="fake-site1", caller="asterix")281        self.notifier.wait_for_state(InstanceState.TERMINATED, all_node_ids,282            before=self.provisioner.leader._force_cycle, timeout=240)283        self.assertStoreNodeRecords(InstanceState.TERMINATED, *all_node_ids)284        self.notifier.wait_for_state(InstanceState.REJECTED, rejected_node_ids, timeout=240)285        self.assertStoreNodeRecords(InstanceState.REJECTED, *rejected_node_ids)286        self.assertEqual(len(self.driver.destroyed),287                         len(all_node_ids))288        self.assertIs(self.client.terminate_all(), True)289        # now re-enable290        self.client.enable()291        node_id = _new_id()292        log.debug("Launching node %s which should be accepted", node_id)293        self.client.provision(_new_id(), [node_id], "empty",294            site="fake-site1", caller="asterix")295        self.notifier.wait_for_state(InstanceState.PENDING, [node_id],296            before=self.provisioner.leader._force_cycle, timeout=60)297        self.assertStoreNodeRecords(InstanceState.PENDING, node_id)298    def test_describe(self):299        node_ids = []300        for _ in range(3):301            launch_id = _new_id()302            running_launch, running_nodes = make_launch_and_nodes(launch_id, 1,303                InstanceState.RUNNING,304                site="fake-site1", caller=self.default_user)305            self.store.add_launch(running_launch)306            for node in running_nodes:307                self.store.add_node(node)308            node_ids.append(running_nodes[0]['node_id'])309        log.debug("requestin")310        all_nodes = self.client.describe_nodes()311        self.assertEqual(len(all_nodes), len(node_ids))312        one_node = self.client.describe_nodes([node_ids[0]])313        self.assertEqual(len(one_node), 1)314        self.assertEqual(one_node[0]['node_id'], node_ids[0])315    def test_multiuser(self):316        """Test that nodes started by one user can't be modified by317        another user318        """319        permitted_user = "asterix"320        disallowed_user = "cacaphonix"321        client = self.client322        deployable_type = 'empty'323        launch_id = _new_id()324        node_ids = [_new_id()]325        vars = {'image_id': 'fake-image'}326        client.provision(launch_id, node_ids, deployable_type,327            'fake-site1', vars=vars, caller=permitted_user)328        self.notifier.wait_for_state(InstanceState.PENDING, node_ids,329            before=self.provisioner.leader._force_cycle)330        self.assertStoreNodeRecords(InstanceState.PENDING, *node_ids)331        # Test describe332        permitted_nodes = client.describe_nodes(caller=permitted_user)333        self.assertEqual(len(permitted_nodes), len(node_ids))334        disallowed_nodes = client.describe_nodes(caller=disallowed_user)335        self.assertEqual(len(disallowed_nodes), 0)336        # Test terminate337        client.terminate_nodes(node_ids, caller=disallowed_user)338        terminate_timed_out = False339        try:340            self.notifier.wait_for_state(InstanceState.TERMINATED, node_ids,341                before=self.provisioner.leader._force_cycle, timeout=2)342        except Exception:343            terminate_timed_out = True344        self.assertTrue(terminate_timed_out,345                msg="Terminate worked with non-matching user")346        client.terminate_nodes(node_ids, caller=permitted_user)347        self.notifier.wait_for_state(InstanceState.TERMINATED, node_ids,348            before=self.provisioner.leader._force_cycle, timeout=2)349        self.assertStoreNodeRecords(InstanceState.TERMINATED, *node_ids)350    def test_record_reaper(self):351        launch_id1 = _new_id()352        launch_id2 = _new_id()353        now = time.time()354        node1 = make_node(launch_id1, InstanceState.TERMINATED, caller=self.default_user,355                          state_changes=[(InstanceState.TERMINATED, now - self.record_reaping_max_age - 1)])356        node2 = make_node(launch_id1, InstanceState.FAILED, caller=self.default_user,357                          state_changes=[(InstanceState.FAILED, now - self.record_reaping_max_age - 1)])358        node3 = make_node(launch_id1, InstanceState.REJECTED, caller=self.default_user,359                          state_changes=[(InstanceState.REJECTED, now - self.record_reaping_max_age - 1)])360        nodes1 = [node1, node2, node3]361        launch1 = make_launch(launch_id1, InstanceState.RUNNING, nodes1, caller=self.default_user)362        node4 = make_node(launch_id2, InstanceState.RUNNING, caller=self.default_user,363                          state_changes=[(InstanceState.RUNNING, now - self.record_reaping_max_age - 1)])364        node5 = make_node(launch_id2, InstanceState.TERMINATED, caller=self.default_user,365                          state_changes=[(InstanceState.TERMINATED, now - self.record_reaping_max_age - 1)])366        nodes2 = [node4, node5]367        launch2 = make_launch(launch_id2, InstanceState.RUNNING, nodes2, caller=self.default_user)368        self.store.add_launch(launch1)369        for node in nodes1:370            self.store.add_node(node)371        self.store.add_launch(launch2)372        for node in nodes2:373            self.store.add_node(node)374        # Wait a second for record to get written375        time.sleep(1)376        # Force a record reaping cycle377        self.provisioner.leader._force_record_reaping()378        # Check that the first launch is completely removed379        node_ids1 = map(lambda x: x['node_id'], nodes1)380        self.assertNoStoreNodeRecords(*node_ids1)381        self.assertNoStoreLaunchRecord(launch_id1)382        # Check that the second launch is still here but with only the running node383        self.assertStoreNodeRecords(InstanceState.RUNNING, node4['node_id'])384        self.assertStoreLaunchRecord(InstanceState.RUNNING, launch_id2)385class ProvisionerServiceNoContextualizationTest(BaseProvisionerServiceTests):386    def setUp(self):387        self.notifier = FakeProvisionerNotifier()388        self.context_client = None389        self.store = self.setup_store()390        self.driver = FakeNodeDriver()391        self.driver.initialize()392        self.spawn_procs()393        self.load_dtrs()394    def test_launch_no_context(self):395        all_node_ids = []396        for _ in range(10):397            node_id = _new_id()398            all_node_ids.append(node_id)399            self.client.provision(_new_id(), [node_id], "empty",400                site="fake-site1", caller="asterix")401        self.notifier.wait_for_state(InstanceState.PENDING, all_node_ids,402            before=self.provisioner.leader._force_cycle)403        self.assertStoreNodeRecords(InstanceState.PENDING, *all_node_ids)404        for node_id in all_node_ids:405            node = self.store.get_node(node_id)406            self.driver.set_node_running(node['iaas_id'])407        self.notifier.wait_for_state(InstanceState.RUNNING, all_node_ids,408            before=self.provisioner.leader._force_cycle)409        self.assertStoreNodeRecords(InstanceState.RUNNING, *all_node_ids)410class ProvisionerZooKeeperServiceTest(ProvisionerServiceTest, ZooKeeperTestMixin):411    # this runs all of the ProvisionerServiceTest tests wih a ZK store412    def setup_store(self):413        self.setup_zookeeper(base_path_prefix="/provisioner_service_tests_")414        store = ProvisionerZooKeeperStore(self.zk_hosts, self.zk_base_path, use_gevent=self.use_gevent)415        store.initialize()416        return store417    def teardown_store(self):...test_v2v_migrations_single_vcenter.py
Source:test_v2v_migrations_single_vcenter.py  
...73        infra_map=mapping_data_vm_obj_mini.infra_mapping_data.get("name"),74        vm_list=mapping_data_vm_obj_mini.vm_list,75        target_provider=provider76    )77    assert migration_plan.wait_for_state("Started")78    assert migration_plan.wait_for_state("In_Progress")79    assert migration_plan.wait_for_state("Completed")80    assert migration_plan.wait_for_state("Successful")81    # check power state on migrated VM82    migrated_vm = get_migrated_vm(src_vm, provider)83    assert power_state in migrated_vm.mgmt.state84    # check tags85    collection = provider.appliance.provider_based_collection(provider)86    vm_obj = collection.instantiate(migrated_vm.name, provider)87    owner_tag = None88    for t in vm_obj.get_tags():89        if tag.display_name in t.display_name:90            owner_tag = t91    assert owner_tag is not None and tag.display_name in owner_tag.display_name92    # If Never is not there, that means retirement is set.93    assert 'Never' not in vm_obj.retirement_date94@pytest.mark.parametrize('source_type, dest_type, template_type',95                         [96                             ['nfs', 'nfs', [Templates.RHEL7_MINIMAL,97                                             Templates.RHEL7_MINIMAL,98                                             Templates.RHEL7_MINIMAL,99                                             Templates.RHEL7_MINIMAL]100                              ]101                         ])102def test_multi_host_multi_vm_migration(request, appliance, provider,103                                       source_type, dest_type, template_type,104                                       mapping_data_multiple_vm_obj_single_datastore):105    """106    Polarion:107        assignee: sshveta108        caseimportance: medium109        caseposneg: positive110        testtype: functional111        startsin: 5.10112        casecomponent: V2V113        initialEstimate: 1h114    """115    infrastructure_mapping_collection = appliance.collections.v2v_infra_mappings116    mapping_data = mapping_data_multiple_vm_obj_single_datastore.infra_mapping_data117    mapping = infrastructure_mapping_collection.create(**mapping_data)118    migration_plan_collection = appliance.collections.v2v_migration_plans119    migration_plan = migration_plan_collection.create(120        name=fauxfactory.gen_alphanumeric(start="plan_"),121        description=fauxfactory.gen_alphanumeric(15, start="plan_desc_"),122        infra_map=mapping.name,123        vm_list=mapping_data_multiple_vm_obj_single_datastore.vm_list,124        target_provider=provider125    )126    assert migration_plan.wait_for_state("Started")127    request_details_list = migration_plan.get_plan_vm_list(wait_for_migration=False)128    vms = request_details_list.read()129    # testing multi-host utilization130    match = ['Converting', 'Migrating']131    def _is_migration_started(vm):132        if any(string in request_details_list.get_message_text(vm) for string in match):133            return True134        return False135    for vm in vms:136        wait_for(func=_is_migration_started, func_args=[vm],137            message="migration has not started for all VMs", delay=5, num_sec=300)138    if provider.one_of(OpenStackProvider):139        host_creds = provider.appliance.collections.openstack_nodes.all()140    else:141        host_creds = provider.hosts.all()142    hosts_dict = {key.name: [] for key in host_creds}143    for vm in vms:144        popup_text = request_details_list.read_additional_info_popup(vm)145        # open__additional_info_popup function also closes opened popup in our case146        request_details_list.open_additional_info_popup(vm)147        if popup_text['Conversion Host'] in hosts_dict:148            hosts_dict[popup_text['Conversion Host']].append(vm)149    for host in hosts_dict:150        if len(hosts_dict[host]) > 0:151            logger.info("Host: {} is migrating VMs: {}".format(host, hosts_dict[host]))152    assert migration_plan.wait_for_state("In_Progress")153    assert migration_plan.wait_for_state("Completed")154    assert migration_plan.wait_for_state("Successful")155def test_migration_special_char_name(appliance, provider, request,156                                     mapping_data_vm_obj_mini):157    """Tests migration where name of migration plan is comprised of special non-alphanumeric158       characters, such as '@#$(&#@('.159    Polarion:160        assignee: sshveta161        caseimportance: medium162        caseposneg: positive163        testtype: functional164        startsin: 5.10165        casecomponent: V2V166        initialEstimate: 1h167    """168    migration_plan_collection = appliance.collections.v2v_migration_plans169    migration_plan = migration_plan_collection.create(170        name=fauxfactory.gen_alphanumeric(start="plan_"),171        description=fauxfactory.gen_alphanumeric(start="plan_desc_"),172        infra_map=mapping_data_vm_obj_mini.infra_mapping_data.get("name"),173        vm_list=mapping_data_vm_obj_mini.vm_list,174        target_provider=provider175    )176    assert migration_plan.wait_for_state("Started")177    assert migration_plan.wait_for_state("In_Progress")178    assert migration_plan.wait_for_state("Completed")179    assert migration_plan.wait_for_state("Successful")180    # validate MAC address matches between source and target VMs181    src_vm = mapping_data_vm_obj_mini.vm_list[0]182    migrated_vm = get_migrated_vm(src_vm, provider)183    @request.addfinalizer184    def _cleanup():185        cleanup_target(provider, migrated_vm)186    assert src_vm.mac_address == migrated_vm.mac_address187def test_migration_long_name(request, appliance, provider, source_provider):188    """Test to check VM name with 64 character should work189    Polarion:190        assignee: sshveta191        initialEstimate: 1/2h192        casecomponent: V2V193    """194    source_datastores_list = source_provider.data.get("datastores", [])195    source_datastore = [d.name for d in source_datastores_list if d.type == "nfs"][0]196    collection = appliance.provider_based_collection(source_provider)197    # Following code will create vm name with 64 characters198    vm_name = "{vm_name}{extra_words}".format(vm_name=random_vm_name(context="v2v"),199                                              extra_words=fauxfactory.gen_alpha(51))200    template = _get_template(source_provider, Templates.RHEL7_MINIMAL)201    vm_obj = collection.instantiate(202        name=vm_name,203        provider=source_provider,204        template_name=template.name,205    )206    vm_obj.create_on_provider(207        timeout=2400,208        find_in_cfme=True,209        allow_skip="default",210        datastore=source_datastore)211    request.addfinalizer(lambda: vm_obj.cleanup_on_provider())212    mapping_data = infra_mapping_default_data(source_provider, provider)213    infrastructure_mapping_collection = appliance.collections.v2v_infra_mappings214    mapping = infrastructure_mapping_collection.create(**mapping_data)215    @request.addfinalizer216    def _cleanup():217        infrastructure_mapping_collection.delete(mapping)218    migration_plan_collection = appliance.collections.v2v_migration_plans219    migration_plan = migration_plan_collection.create(220        name=fauxfactory.gen_alphanumeric(20, start="long_name_"),221        description=fauxfactory.gen_alphanumeric(25, start="desc_long_name_"),222        infra_map=mapping.name,223        vm_list=[vm_obj],224        target_provider=provider225    )226    assert migration_plan.wait_for_state("Started")227    assert migration_plan.wait_for_state("In_Progress")228    assert migration_plan.wait_for_state("Completed")229    assert migration_plan.wait_for_state("Successful")230    migrated_vm = get_migrated_vm(vm_obj, provider)231    assert vm_obj.mac_address == migrated_vm.mac_address232@pytest.mark.parametrize('source_type, dest_type, template_type',233                         [['nfs', 'nfs', Templates.RHEL7_MINIMAL]])234def test_migration_with_edited_mapping(request, appliance, source_provider, provider,235                                       source_type, dest_type, template_type,236                                       mapping_data_vm_obj_single_datastore):237    """238        Test migration with edited infrastructure mapping.239        Polarion:240            assignee: sshveta241            caseimportance: medium242            caseposneg: positive243            testtype: functional244            startsin: 5.10245            casecomponent: V2V246            initialEstimate: 1h247        """248    infrastructure_mapping_collection = appliance.collections.v2v_infra_mappings249    mapping_data = infra_mapping_default_data(source_provider, provider)250    mapping = infrastructure_mapping_collection.create(**mapping_data)251    mapping.update(mapping_data_vm_obj_single_datastore.infra_mapping_data)252    # vm_obj is a list, with only 1 VM object, hence [0]253    src_vm_obj = mapping_data_vm_obj_single_datastore.vm_list[0]254    migration_plan_collection = appliance.collections.v2v_migration_plans255    migration_plan = migration_plan_collection.create(256        name=fauxfactory.gen_alphanumeric(start="plan_"),257        description=fauxfactory.gen_alphanumeric(15, start="plan_desc_"),258        infra_map=mapping.name,259        vm_list=mapping_data_vm_obj_single_datastore.vm_list,260        target_provider=provider)261    assert migration_plan.wait_for_state("Started")262    assert migration_plan.wait_for_state("In_Progress")263    assert migration_plan.wait_for_state("Completed")264    assert migration_plan.wait_for_state("Successful")265    migrated_vm = get_migrated_vm(src_vm_obj, provider)266    @request.addfinalizer267    def _cleanup():268        infrastructure_mapping_collection.delete(mapping)269        cleanup_target(provider, migrated_vm)270    assert src_vm_obj.mac_address == migrated_vm.mac_address271@pytest.mark.tier(3)272@pytest.mark.parametrize(273    "source_type, dest_type, template_type",274    [["nfs", "nfs", Templates.UBUNTU16_TEMPLATE]])275def test_migration_restart(request, appliance, provider,276                           source_type, dest_type, template_type,277                           mapping_data_vm_obj_single_datastore):278    """279    Test migration by restarting evmserverd in middle of the process280    Polarion:281        assignee: sshveta282        initialEstimate: 1h283        caseimportance: medium284        caseposneg: positive285        testtype: functional286        startsin: 5.10287        casecomponent: V2V288    """289    infrastructure_mapping_collection = appliance.collections.v2v_infra_mappings290    mapping_data = mapping_data_vm_obj_single_datastore.infra_mapping_data291    mapping = infrastructure_mapping_collection.create(**mapping_data)292    src_vm_obj = mapping_data_vm_obj_single_datastore.vm_list[0]293    migration_plan_collection = appliance.collections.v2v_migration_plans294    migration_plan = migration_plan_collection.create(295        name=fauxfactory.gen_alphanumeric(start="plan_"),296        description=fauxfactory.gen_alphanumeric(15, start="plan_desc_"),297        infra_map=mapping.name,298        target_provider=provider,299        vm_list=mapping_data_vm_obj_single_datastore.vm_list,300    )301    assert migration_plan.wait_for_state("Started")302    # reboot system when the actual disk migration elapsed a 240 second time duration303    migration_plan.in_progress(plan_elapsed_time=240)304    appliance.restart_evm_rude()305    appliance.wait_for_miq_ready()306    try:307        assert migration_plan.wait_for_state("In_Progress")308    except WebDriverException:309        pass310    assert migration_plan.wait_for_state("Completed")311    assert migration_plan.wait_for_state("Successful")312    migrated_vm = get_migrated_vm(src_vm_obj, provider)313    @request.addfinalizer314    def _cleanup():315        infrastructure_mapping_collection.delete(mapping)316        cleanup_target(provider, migrated_vm)317    assert src_vm_obj.mac_address == migrated_vm.mac_address318@pytest.mark.tier(2)319def test_if_no_password_is_exposed_in_logs_during_migration(appliance, source_provider, provider,320                                                            request, mapping_data_vm_obj_mini):321    """322    title: OSP: Test if no password is exposed in logs during migration323    Polarion:324        assignee: mnadeem325        casecomponent: V2V326        initialEstimate: 1/8h327        startsin: 5.10328        subcomponent: OSP329        testSteps:330            1. Create infrastructure mapping for Vmware to OSP/RHV331            2. Create migration plan332            3. Start migration333        expectedResults:334            1. Mapping created and visible in UI335            2.336            3. logs should not show password during migration337    """338    cred = []339    ssh_key_name = source_provider.data['private-keys']['vmware-ssh-key']['credentials']340    cred.append(credentials[source_provider.data.get("credentials")]["password"])341    cred.append(credentials[ssh_key_name]["password"])342    cred.append(credentials[provider.data.get("credentials")]["password"])343    if provider.one_of(OpenStackProvider):344        osp_key_name = provider.data['private-keys']['conversion_host_ssh_key']['credentials']345        cred.append(credentials[osp_key_name]["password"])346    automation_log = LogValidator("/var/www/miq/vmdb/log/automation.log", failure_patterns=cred,347                           hostname=appliance.hostname)348    evm_log = LogValidator("/var/www/miq/vmdb/log/evm.log", failure_patterns=cred,349                           hostname=appliance.hostname)350    automation_log.start_monitoring()351    evm_log.start_monitoring()352    migration_plan_collection = appliance.collections.v2v_migration_plans353    migration_plan = migration_plan_collection.create(354        name=fauxfactory.gen_alphanumeric(start="plan_"),355        description=fauxfactory.gen_alphanumeric(start="plan_desc_"),356        infra_map=mapping_data_vm_obj_mini.infra_mapping_data.get("name"),357        vm_list=mapping_data_vm_obj_mini.vm_list,358        target_provider=provider359    )360    assert migration_plan.wait_for_state("Started")361    assert migration_plan.wait_for_state("In_Progress")362    assert migration_plan.wait_for_state("Completed")363    assert migration_plan.wait_for_state("Successful")364    src_vm = mapping_data_vm_obj_mini.vm_list[0]365    migrated_vm = get_migrated_vm(src_vm, provider)366    @request.addfinalizer367    def _cleanup():368        cleanup_target(provider, migrated_vm)369        migration_plan.delete_completed_plan()370    # Check log files for any exposed password371    assert automation_log.validate()...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
