How to use wait_for_state method in Playwright Python

Best Python code snippet using playwright-python

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

TestAttachResume.py

Source: TestAttachResume.py Github

copy
1"""
2Test process attach/resume.
3"""
4
5import os, time
6import unittest2
7import lldb
8from lldbtest import *
9import lldbutil
10
11exe_name = "AttachResume"  # Must match Makefile
12
13class AttachResumeTestCase(TestBase):
14
15    mydir = TestBase.compute_mydir(__file__)
16
17    @expectedFailureFreeBSD('llvm.org/pr19310')
18    @dwarf_test
19    def test_attach_continue_interrupt_detach(self):
20        """Test attach/continue/interrupt/detach"""
21        self.buildDwarf()
22        self.process_attach_continue_interrupt_detach()
23
24    @expectedFailureLinux('llvm.org/pr19478')
25    def process_attach_continue_interrupt_detach(self):
26        """Test attach/continue/interrupt/detach"""
27
28        exe = os.path.join(os.getcwd(), exe_name)
29
30        popen = self.spawnSubprocess(exe)
31        self.addTearDownHook(self.cleanupSubprocesses)
32
33        self.runCmd("process attach -p " + str(popen.pid))
34
35        self._state = 0
36        def process_events():
37            event = lldb.SBEvent()
38            while self.dbg.GetListener().GetNextEvent(event):
39                self._state = lldb.SBProcess.GetStateFromEvent(event)
40
41        # using process.GetState() does not work: llvm.org/pr16172
42        def wait_for_state(s, timeout=5):
43            t = 0
44            period = 0.1
45            while self._state != s:
46                process_events()
47                time.sleep(period)
48                t += period
49                if t > timeout:
50                    return False
51            return True
52
53        self.setAsync(True)
54
55        self.runCmd("c")
56        self.assertTrue(wait_for_state(lldb.eStateRunning),
57            'Process not running after continue')
58
59        self.runCmd("process interrupt")
60        self.assertTrue(wait_for_state(lldb.eStateStopped),
61            'Process not stopped after interrupt')
62
63        # be sure to continue/interrupt/continue (r204504)
64        self.runCmd("c")
65        self.assertTrue(wait_for_state(lldb.eStateRunning),
66            'Process not running after continue')
67
68        self.runCmd("process interrupt")
69        self.assertTrue(wait_for_state(lldb.eStateStopped),
70            'Process not stopped after interrupt')
71
72        # check that this breakpoint is auto-cleared on detach (r204752)
73        self.runCmd("br set -f main.cpp -l 12")
74
75        self.runCmd("c")
76        self.assertTrue(wait_for_state(lldb.eStateRunning),
77            'Process not running after continue')
78
79        self.assertTrue(wait_for_state(lldb.eStateStopped),
80            'Process not stopped after breakpoint')
81        self.expect('br list', 'Breakpoint not hit',
82            patterns = ['hit count = 1'])
83
84        self.runCmd("c")
85        self.assertTrue(wait_for_state(lldb.eStateRunning),
86            'Process not running after continue')
87
88        # make sure to detach while in running state (r204759)
89        self.runCmd("detach")
90        self.assertTrue(wait_for_state(lldb.eStateDetached),
91            'Process not detached after detach')
92
93if __name__ == '__main__':
94    import atexit
95    lldb.SBDebugger.Initialize()
96    atexit.register(lambda: lldb.SBDebugger.Terminate())
97    unittest2.main()
98
Full Screen

test_update.py

Source: test_update.py Github

copy
1import pytest
2import grpc
3import logging
4import operator
5import time
6import random
7
8from peloton_client.pbgen.peloton.api.v0.task import task_pb2
9from peloton_client.pbgen.peloton.api.v1alpha.job.stateless import (
10    stateless_pb2,
11)
12from peloton_client.pbgen.peloton.api.v1alpha.job.stateless.stateless_pb2 import (
13    JobSpec,
14)
15from peloton_client.pbgen.peloton.api.v1alpha.pod import pod_pb2
16from peloton_client.pbgen.peloton.api.v0.host import host_pb2
17
18from google.protobuf import json_format
19
20from tests.integration.conftest import get_container
21from tests.integration.conf_util import minicluster_type
22from tests.integration.stateless_job_test.util import (
23    assert_pod_id_changed,
24    assert_pod_spec_changed,
25    assert_pod_spec_equal,
26    assert_pod_id_equal,
27    get_host_to_task_count,
28)
29from tests.integration.stateless_update import StatelessUpdate
30from tests.integration.util import load_test_config
31from tests.integration.stateless_job import StatelessJob
32from tests.integration.common import IntegrationTestConfig
33from tests.integration.stateless_job import INVALID_ENTITY_VERSION_ERR_MESSAGE
34from tests.integration.host import (
35    get_host_in_state,
36    wait_for_host_state,
37    is_host_in_state,
38    query_hosts,
39)
40
41pytestmark = [
42    pytest.mark.default,
43    pytest.mark.stateless,
44    pytest.mark.update,
45    pytest.mark.random_order(disabled=True),
46]
47
48log = logging.getLogger(__name__)
49
50UPDATE_STATELESS_JOB_SPEC = "test_update_stateless_job_spec.yaml"
51UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC = (
52    "test_update_stateless_job_add_instances_spec.yaml"
53)
54UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC = (
55    "test_update_stateless_job_update_and_add_instances_spec.yaml"
56)
57UPDATE_STATELESS_JOB_UPDATE_REDUCE_INSTANCES_SPEC = (
58    "test_stateless_job_spec.yaml"
59)
60UPDATE_STATELESS_JOB_BAD_SPEC = "test_stateless_job_with_bad_spec.yaml"
61UPDATE_STATELESS_JOB_BAD_HEALTH_CHECK_SPEC = (
62    "test_stateless_job_failed_health_check_spec.yaml"
63)
64UPDATE_STATELESS_JOB_WITH_HEALTH_CHECK_SPEC = (
65    "test_stateless_job_successful_health_check_spec.yaml"
66)
67UPDATE_STATELESS_JOB_INVALID_SPEC = "test_stateless_job_spec_invalid.yaml"
68UPDATE_STATELESS_JOB_JOB_CONFIG_UPDATE_SPEC = (
69    "test_stateless_job_job_config_update_spec.yaml"
70)
71
72
73def update_stateless_job_spec():
74    if minicluster_type() != "k8s":
75        return "test_update_stateless_job_spec.yaml"
76    return "test_stateless_job_spec_k8s.yaml"
77
78
79def test__create_update(stateless_job, in_place):
80    stateless_job.create()
81    stateless_job.wait_for_state(goal_state="RUNNING")
82    old_pod_infos = stateless_job.query_pods()
83    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
84    update = StatelessUpdate(
85        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_SPEC
86    )
87    update.create(in_place=in_place)
88    update.wait_for_state(goal_state="SUCCEEDED")
89    new_pod_infos = stateless_job.query_pods()
90    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
91    assert_pod_id_changed(old_pod_infos, new_pod_infos)
92    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)
93
94
95def test__create_update_add_instances(stateless_job, in_place):
96    stateless_job.create()
97    stateless_job.wait_for_state(goal_state="RUNNING")
98    old_pod_infos = stateless_job.query_pods()
99    update = StatelessUpdate(
100        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC
101    )
102    update.create(in_place=in_place)
103    update.wait_for_state(goal_state="SUCCEEDED")
104    new_pod_infos = stateless_job.query_pods()
105    assert len(old_pod_infos) == 3
106    assert len(new_pod_infos) == 5
107
108
109def test__create_update_update_and_add_instances(stateless_job, in_place):
110    stateless_job.create()
111    stateless_job.wait_for_state(goal_state="RUNNING")
112    old_pod_infos = stateless_job.query_pods()
113    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
114    update = StatelessUpdate(
115        stateless_job,
116        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,
117    )
118    update.create(in_place=in_place)
119    update.wait_for_state(goal_state="SUCCEEDED")
120    new_pod_infos = stateless_job.query_pods()
121    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
122    assert len(old_pod_infos) == 3
123    assert len(new_pod_infos) == 5
124    assert_pod_id_changed(old_pod_infos, new_pod_infos)
125    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)
126
127
128def test__create_update_update_start_paused(stateless_job, in_place):
129    stateless_job.create()
130    stateless_job.wait_for_state(goal_state="RUNNING")
131    old_pod_infos = stateless_job.query_pods()
132    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
133    update = StatelessUpdate(
134        stateless_job,
135        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,
136        start_paused=True,
137    )
138    update.create(in_place=in_place)
139    update.wait_for_state(goal_state="PAUSED")
140    update.resume()
141    update.wait_for_state(goal_state="SUCCEEDED")
142    new_pod_infos = stateless_job.query_pods()
143    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
144    assert len(old_pod_infos) == 3
145    assert len(new_pod_infos) == 5
146    assert_pod_id_changed(old_pod_infos, new_pod_infos)
147    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)
148
149
150def test__create_update_with_batch_size(stateless_job, in_place):
151    stateless_job.create()
152    stateless_job.wait_for_state(goal_state="RUNNING")
153    old_pod_infos = stateless_job.query_pods()
154    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
155    update = StatelessUpdate(
156        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_SPEC, batch_size=1
157    )
158    update.create(in_place=in_place)
159    update.wait_for_state(goal_state="SUCCEEDED")
160    new_pod_infos = stateless_job.query_pods()
161    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
162    assert_pod_id_changed(old_pod_infos, new_pod_infos)
163    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)
164
165
166def test__create_update_add_instances_with_batch_size(stateless_job, in_place):
167    stateless_job.create()
168    stateless_job.wait_for_state(goal_state="RUNNING")
169    old_pod_infos = stateless_job.query_pods()
170    update = StatelessUpdate(
171        stateless_job,
172        updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC,
173        batch_size=1,
174    )
175    update.create(in_place=in_place)
176    update.wait_for_state(goal_state="SUCCEEDED")
177    new_pod_infos = stateless_job.query_pods()
178    assert len(old_pod_infos) == 3
179    assert len(new_pod_infos) == 5
180
181
182def test__create_update_update_and_add_instances_with_batch(stateless_job, in_place):
183    stateless_job.create()
184    stateless_job.wait_for_state(goal_state="RUNNING")
185    old_pod_infos = stateless_job.query_pods()
186    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
187    update = StatelessUpdate(
188        stateless_job,
189        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,
190        batch_size=1,
191    )
192    update.create(in_place=in_place)
193    update.wait_for_state(goal_state="SUCCEEDED")
194    new_pod_infos = stateless_job.query_pods()
195    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
196    assert len(old_pod_infos) == 3
197    assert len(new_pod_infos) == 5
198    assert_pod_id_changed(old_pod_infos, new_pod_infos)
199    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)
200
201
202def test__create_update_bad_version(stateless_job, in_place):
203    stateless_job.create()
204    stateless_job.wait_for_state(goal_state="RUNNING")
205    update = StatelessUpdate(
206        stateless_job,
207        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,
208        batch_size=1,
209    )
210    try:
211        update.create(entity_version="1-2-3", in_place=in_place)
212    except grpc.RpcError as e:
213        assert e.code() == grpc.StatusCode.ABORTED
214        assert INVALID_ENTITY_VERSION_ERR_MESSAGE in e.details()
215        return
216    raise Exception("entity version mismatch error not received")
217
218
219def test__pause_update_bad_version(stateless_job, in_place):
220    stateless_job.create()
221    stateless_job.wait_for_state(goal_state="RUNNING")
222    update = StatelessUpdate(
223        stateless_job,
224        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,
225        batch_size=1,
226    )
227    update.create(in_place=in_place)
228    try:
229        update.pause(entity_version="1-2-3")
230    except grpc.RpcError as e:
231        assert e.code() == grpc.StatusCode.ABORTED
232        assert INVALID_ENTITY_VERSION_ERR_MESSAGE in e.details()
233        return
234    raise Exception("entity version mismatch error not received")
235
236
237def test__resume_update_bad_version(stateless_job, in_place):
238    stateless_job.create()
239    stateless_job.wait_for_state(goal_state="RUNNING")
240    update = StatelessUpdate(
241        stateless_job,
242        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,
243        start_paused=True,
244        batch_size=1,
245    )
246    update.create(in_place=in_place)
247    try:
248        update.resume(entity_version="1-2-3")
249    except grpc.RpcError as e:
250        assert e.code() == grpc.StatusCode.ABORTED
251        assert INVALID_ENTITY_VERSION_ERR_MESSAGE in e.details()
252        return
253    raise Exception("entity version mismatch error not received")
254
255
256def test__abort_update_bad_version(stateless_job, in_place):
257    stateless_job.create()
258    stateless_job.wait_for_state(goal_state="RUNNING")
259    update = StatelessUpdate(
260        stateless_job,
261        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,
262        batch_size=1,
263    )
264    update.create(in_place=in_place)
265    try:
266        update.abort(entity_version="1-2-3")
267    except grpc.RpcError as e:
268        assert e.code() == grpc.StatusCode.ABORTED
269        assert INVALID_ENTITY_VERSION_ERR_MESSAGE in e.details()
270        return
271    raise Exception("entity version mismatch error not received")
272
273
274def test__create_update_stopped_job(stateless_job, in_place):
275    stateless_job.create()
276    stateless_job.wait_for_state(goal_state="RUNNING")
277    old_pod_infos = stateless_job.query_pods()
278    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
279
280    old_pod_states = set()
281    for pod_info in old_pod_infos:
282        old_pod_states.add(pod_info.spec.pod_name.value)
283
284    stateless_job.stop()
285    stateless_job.wait_for_state(goal_state="KILLED")
286    update = StatelessUpdate(
287        stateless_job,
288        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,
289        batch_size=1,
290    )
291    update.create(in_place=in_place)
292    stateless_job.start()
293    update.wait_for_state(goal_state="SUCCEEDED")
294    stateless_job.wait_for_state(goal_state="RUNNING")
295    new_pod_infos = stateless_job.query_pods()
296    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
297    assert len(old_pod_infos) == 3
298    assert len(new_pod_infos) == 5
299    assert_pod_id_changed(old_pod_infos, new_pod_infos)
300    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)
301
302    # Only new instances should be RUNNING
303    for pod_info in new_pod_infos:
304        if pod_info.spec.pod_name.value in new_pod_infos:
305            assert pod_info.status.state == pod_pb2.POD_STATE_KILLED
306        else:
307            assert pod_info.status.state == pod_pb2.POD_STATE_RUNNING
308
309
310def test__create_update_stopped_tasks(stateless_job, in_place):
311    stateless_job.create()
312    stateless_job.wait_for_state(goal_state="RUNNING")
313    old_pod_infos = stateless_job.query_pods()
314    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
315    stateless_job.stop()
316    update = StatelessUpdate(
317        stateless_job,
318        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,
319        batch_size=1,
320    )
321    update.create(in_place=in_place)
322    update.wait_for_state(goal_state="SUCCEEDED")
323    stateless_job.wait_for_state(goal_state="KILLED")
324    stateless_job.start()
325    stateless_job.wait_for_state(goal_state="RUNNING")
326    new_pod_infos = stateless_job.query_pods()
327    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
328    assert len(old_pod_infos) == 3
329    assert len(new_pod_infos) == 5
330    assert_pod_id_changed(old_pod_infos, new_pod_infos)
331    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)
332
333
334def test__create_multiple_consecutive_updates(stateless_job, in_place):
335    stateless_job.create()
336    stateless_job.wait_for_state(goal_state="RUNNING")
337    old_pod_infos = stateless_job.query_pods()
338    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
339    update1 = StatelessUpdate(
340        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC
341    )
342    update1.create(in_place=in_place)
343    update2 = StatelessUpdate(
344        stateless_job,
345        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,
346        batch_size=1,
347    )
348    update2.create(in_place=in_place)
349    update2.wait_for_state(goal_state="SUCCEEDED")
350    new_pod_infos = stateless_job.query_pods()
351    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
352    assert len(old_pod_infos) == 3
353    assert len(new_pod_infos) == 5
354    assert_pod_id_changed(old_pod_infos, new_pod_infos)
355    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)
356
357
358def test__abort_update(stateless_job, in_place):
359    stateless_job.create()
360    stateless_job.wait_for_state(goal_state="RUNNING")
361    update = StatelessUpdate(
362        stateless_job,
363        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,
364        batch_size=1,
365    )
366    update.create(in_place=in_place)
367    update.wait_for_state(goal_state="ROLLING_FORWARD")
368    update.abort()
369    update.wait_for_state(goal_state="ABORTED")
370
371
372def test__update_reduce_instances(stateless_job, in_place):
373    stateless_job.create()
374    stateless_job.wait_for_state(goal_state="RUNNING")
375    old_pod_infos = stateless_job.query_pods()
376    assert len(old_pod_infos) == 3
377    # first increase instances
378    update = StatelessUpdate(
379        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC
380    )
381    update.create()
382    update.wait_for_state(goal_state="SUCCEEDED")
383    new_pod_infos = stateless_job.query_pods()
384    assert len(new_pod_infos) == 5
385    # now reduce instances
386    update = StatelessUpdate(
387        stateless_job,
388        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_REDUCE_INSTANCES_SPEC,
389    )
390    update.create(in_place=in_place)
391    update.wait_for_state(goal_state="SUCCEEDED")
392    new_pod_infos = stateless_job.query_pods()
393    assert len(new_pod_infos) == 3
394    # now increase back again
395    update = StatelessUpdate(
396        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC
397    )
398    update.create()
399    update.wait_for_state(goal_state="SUCCEEDED")
400    new_pod_infos = stateless_job.query_pods()
401    assert len(new_pod_infos) == 5
402
403
404def test__update_reduce_instances_stopped_tasks(stateless_job, in_place):
405    stateless_job.create()
406    stateless_job.wait_for_state(goal_state="RUNNING")
407    old_pod_infos = stateless_job.query_pods()
408    assert len(old_pod_infos) == 3
409    # first increase instances
410    update = StatelessUpdate(
411        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC
412    )
413    update.create(in_place=in_place)
414    update.wait_for_state(goal_state="SUCCEEDED")
415    new_pod_infos = stateless_job.query_pods()
416    assert len(new_pod_infos) == 5
417    # now stop last 2 tasks
418    ranges = task_pb2.InstanceRange(to=5)
419    setattr(ranges, "from", 3)
420    stateless_job.stop(ranges=[ranges])
421    # now reduce instance count
422    update = StatelessUpdate(
423        stateless_job,
424        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_REDUCE_INSTANCES_SPEC,
425    )
426    update.create(in_place=in_place)
427    update.wait_for_state(goal_state="SUCCEEDED")
428    new_pod_infos = stateless_job.query_pods()
429    assert len(new_pod_infos) == 3
430
431
432# test__create_update_bad_config tests creating an update with bad config
433# without rollback
434def test__create_update_with_bad_config(stateless_job, in_place):
435    stateless_job.create()
436    stateless_job.wait_for_state(goal_state="RUNNING")
437    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
438
439    update = StatelessUpdate(
440        stateless_job,
441        updated_job_file=UPDATE_STATELESS_JOB_BAD_SPEC,
442        max_failure_instances=3,
443        max_instance_attempts=1,
444    )
445    update.create(in_place=in_place)
446    update.wait_for_state(goal_state="FAILED", failed_state="SUCCEEDED")
447    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
448
449    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)
450    for pod_info in stateless_job.query_pods():
451        assert pod_info.status.state == pod_pb2.POD_STATE_FAILED
452
453
454# test__create_update_add_instances_with_bad_config
455# tests creating an update with bad config and more instances
456# without rollback
457def test__create_update_add_instances_with_bad_config(stateless_job, in_place):
458    stateless_job.create()
459    stateless_job.wait_for_state(goal_state="RUNNING")
460
461    job_spec_dump = load_test_config(UPDATE_STATELESS_JOB_BAD_SPEC)
462    updated_job_spec = JobSpec()
463    json_format.ParseDict(job_spec_dump, updated_job_spec)
464
465    updated_job_spec.instance_count = stateless_job.job_spec.instance_count + 3
466
467    update = StatelessUpdate(
468        stateless_job,
469        batch_size=1,
470        updated_job_spec=updated_job_spec,
471        max_failure_instances=1,
472        max_instance_attempts=1,
473    )
474    update.create(in_place=in_place)
475    update.wait_for_state(goal_state="FAILED", failed_state="SUCCEEDED")
476
477    # only one instance should be added
478    assert (
479        len(stateless_job.query_pods())
480        == stateless_job.job_spec.instance_count + 1
481    )
482
483
484# test__create_update_reduce_instances_with_bad_config
485# tests creating an update with bad config and fewer instances
486# without rollback
487def test__create_update_reduce_instances_with_bad_config(stateless_job, in_place):
488    stateless_job.create()
489    stateless_job.wait_for_state(goal_state="RUNNING")
490    old_pod_infos = stateless_job.query_pods()
491
492    job_spec_dump = load_test_config(UPDATE_STATELESS_JOB_BAD_SPEC)
493    updated_job_spec = JobSpec()
494    json_format.ParseDict(job_spec_dump, updated_job_spec)
495
496    updated_job_spec.instance_count = stateless_job.job_spec.instance_count - 1
497
498    update = StatelessUpdate(
499        stateless_job,
500        updated_job_spec=updated_job_spec,
501        batch_size=1,
502        max_failure_instances=1,
503        max_instance_attempts=1,
504    )
505    update.create(in_place=in_place)
506    update.wait_for_state(goal_state="FAILED", failed_state="SUCCEEDED")
507    new_pod_infos = stateless_job.query_pods()
508    assert len(old_pod_infos) == len(new_pod_infos)
509
510
511# test__create_update_with_failed_health_check
512# tests an update fails even if the new task state is RUNNING,
513# as long as the health check fails
514def test__create_update_with_failed_health_check(stateless_job, in_place):
515    stateless_job.create()
516    stateless_job.wait_for_state(goal_state="RUNNING")
517
518    update = StatelessUpdate(
519        stateless_job,
520        updated_job_file=UPDATE_STATELESS_JOB_BAD_HEALTH_CHECK_SPEC,
521        max_failure_instances=1,
522        max_instance_attempts=1,
523    )
524    update.create(in_place=in_place)
525    update.wait_for_state(goal_state="FAILED", failed_state="SUCCEEDED")
526
527
528# test__create_update_to_disable_health_check tests an update which
529# disables healthCheck
530def test__create_update_to_disable_health_check(in_place):
531    job = StatelessJob(
532        job_file=UPDATE_STATELESS_JOB_WITH_HEALTH_CHECK_SPEC,
533        config=IntegrationTestConfig(
534            max_retry_attempts=100,
535            pool_file='test_stateless_respool.yaml',
536        ),
537    )
538    job.create()
539    job.wait_for_state(goal_state="RUNNING")
540
541    job.job_spec.default_spec.containers[0].liveness_check.enabled = False
542    update = StatelessUpdate(
543        job,
544        updated_job_spec=job.job_spec,
545        max_failure_instances=1,
546        max_instance_attempts=1,
547    )
548    update.create(in_place=in_place)
549    update.wait_for_state(goal_state="SUCCEEDED")
550
551
552# test__create_update_to_enable_health_check tests an update which
553# enables healthCheck
554def test__create_update_to_enable_health_check(in_place):
555    job = StatelessJob(
556        job_file=UPDATE_STATELESS_JOB_WITH_HEALTH_CHECK_SPEC,
557        config=IntegrationTestConfig(
558            max_retry_attempts=100,
559            pool_file='test_stateless_respool.yaml',
560        ),
561    )
562    job.job_spec.default_spec.containers[0].liveness_check.enabled = False
563    job.create()
564    job.wait_for_state(goal_state="RUNNING")
565
566    job.job_spec.default_spec.containers[0].liveness_check.enabled = True
567    update = StatelessUpdate(
568        job,
569        updated_job_spec=job.job_spec,
570        max_failure_instances=1,
571        max_instance_attempts=1,
572    )
573    update.create(in_place=in_place)
574    update.wait_for_state(goal_state="SUCCEEDED")
575
576
577# test__create_update_to_unset_health_check tests an update to unset
578# health check config
579def test__create_update_to_unset_health_check(in_place):
580    job = StatelessJob(
581        job_file=UPDATE_STATELESS_JOB_WITH_HEALTH_CHECK_SPEC,
582        config=IntegrationTestConfig(
583            max_retry_attempts=100,
584            pool_file='test_stateless_respool.yaml',
585        ),
586    )
587    job.create()
588    job.wait_for_state(goal_state="RUNNING")
589
590    update = StatelessUpdate(
591        job,
592        updated_job_file=UPDATE_STATELESS_JOB_SPEC,
593        max_failure_instances=1,
594        max_instance_attempts=1,
595    )
596    update.create(in_place=in_place)
597    update.wait_for_state(goal_state="SUCCEEDED")
598
599
600# test__create_update_to_unset_health_check tests an update to set
601# health check config for a job without health check set
602def test__create_update_to_set_health_check(in_place):
603    job = StatelessJob(
604        job_file=UPDATE_STATELESS_JOB_SPEC,
605        config=IntegrationTestConfig(
606            max_retry_attempts=100,
607            pool_file='test_stateless_respool.yaml',
608        ),
609    )
610    job.create()
611    job.wait_for_state(goal_state="RUNNING")
612
613    update = StatelessUpdate(
614        job,
615        updated_job_file=UPDATE_STATELESS_JOB_WITH_HEALTH_CHECK_SPEC,
616        max_failure_instances=1,
617        max_instance_attempts=1,
618    )
619    update.create(in_place=in_place)
620    update.wait_for_state(goal_state="SUCCEEDED")
621
622
623# test__create_update_to_change_health_check_config tests an update which
624# changes healthCheck
625def test__create_update_to_change_health_check_config(in_place):
626    job = StatelessJob(
627        job_file=UPDATE_STATELESS_JOB_WITH_HEALTH_CHECK_SPEC,
628        config=IntegrationTestConfig(
629            max_retry_attempts=100,
630            pool_file='test_stateless_respool.yaml',
631        ),
632    )
633    job.job_spec.default_spec.containers[0].liveness_check.enabled = False
634    job.create()
635    job.wait_for_state(goal_state="RUNNING")
636
637    job.job_spec.default_spec.containers[
638        0
639    ].liveness_check.initial_interval_secs = 2
640    update = StatelessUpdate(
641        job,
642        updated_job_spec=job.job_spec,
643        max_failure_instances=1,
644        max_instance_attempts=1,
645    )
646    update.create(in_place=in_place)
647    update.wait_for_state(goal_state="SUCCEEDED")
648
649
650# test__auto_rollback_update_with_bad_config tests creating an update with bad config
651# with rollback
652def test__auto_rollback_update_with_bad_config(stateless_job, in_place):
653    stateless_job.create()
654    stateless_job.wait_for_state(goal_state="RUNNING")
655    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
656
657    update = StatelessUpdate(
658        stateless_job,
659        updated_job_file=UPDATE_STATELESS_JOB_BAD_SPEC,
660        roll_back_on_failure=True,
661        max_failure_instances=1,
662        max_instance_attempts=1,
663    )
664    update.create(in_place=in_place)
665    update.wait_for_state(goal_state="ROLLED_BACK")
666    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
667
668    assert_pod_spec_equal(old_instance_zero_spec, new_instance_zero_spec)
669
670
671# test__auto_rollback_update_add_instances_with_bad_config
672# tests creating an update with bad config and more instances
673# with rollback
674def test__auto_rollback_update_add_instances_with_bad_config(stateless_job, in_place):
675    stateless_job.create()
676    stateless_job.wait_for_state(goal_state="RUNNING")
677    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
678
679    job_spec_dump = load_test_config(UPDATE_STATELESS_JOB_BAD_SPEC)
680    updated_job_spec = JobSpec()
681    json_format.ParseDict(job_spec_dump, updated_job_spec)
682
683    updated_job_spec.instance_count = stateless_job.job_spec.instance_count + 3
684
685    update = StatelessUpdate(
686        stateless_job,
687        updated_job_spec=updated_job_spec,
688        roll_back_on_failure=True,
689        max_failure_instances=1,
690        max_instance_attempts=1,
691    )
692    update.create(in_place=in_place)
693    update.wait_for_state(goal_state="ROLLED_BACK")
694    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
695
696    # no instance should be added
697    assert (
698        len(stateless_job.query_pods())
699        == stateless_job.job_spec.instance_count
700    )
701    assert_pod_spec_equal(old_instance_zero_spec, new_instance_zero_spec)
702
703
704# test__auto_rollback_update_reduce_instances_with_bad_config
705# tests creating an update with bad config and fewer instances
706# with rollback
707def test__auto_rollback_update_reduce_instances_with_bad_config(stateless_job, in_place):
708    stateless_job.create()
709    stateless_job.wait_for_state(goal_state="RUNNING")
710    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
711
712    job_spec_dump = load_test_config(UPDATE_STATELESS_JOB_BAD_SPEC)
713    updated_job_spec = JobSpec()
714    json_format.ParseDict(job_spec_dump, updated_job_spec)
715
716    updated_job_spec.instance_count = stateless_job.job_spec.instance_count - 1
717
718    update = StatelessUpdate(
719        stateless_job,
720        updated_job_spec=updated_job_spec,
721        roll_back_on_failure=True,
722        max_failure_instances=1,
723        max_instance_attempts=1,
724    )
725    update.create(in_place=in_place)
726    update.wait_for_state(goal_state="ROLLED_BACK")
727    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
728
729    # no instance should be removed
730    assert (
731        len(stateless_job.query_pods())
732        == stateless_job.job_spec.instance_count
733    )
734    assert_pod_spec_equal(old_instance_zero_spec, new_instance_zero_spec)
735
736
737# test__auto_rollback_update_with_failed_health_check
738# tests an update fails even if the new task state is RUNNING,
739# as long as the health check fails
740def test__auto_rollback_update_with_failed_health_check(stateless_job, in_place):
741    stateless_job.create()
742    stateless_job.wait_for_state(goal_state="RUNNING")
743    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
744
745    update = StatelessUpdate(
746        stateless_job,
747        updated_job_file=UPDATE_STATELESS_JOB_BAD_HEALTH_CHECK_SPEC,
748        roll_back_on_failure=True,
749        max_failure_instances=1,
750        max_instance_attempts=1,
751    )
752    update.create(in_place=in_place)
753    update.wait_for_state(goal_state="ROLLED_BACK")
754    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
755    assert_pod_spec_equal(old_instance_zero_spec, new_instance_zero_spec)
756
757
758# test__pause_resume_initialized_update test pause and resume
759#  an update in initialized state
760def test__pause_resume_initialized_update(stateless_job, in_place):
761    stateless_job.create()
762    stateless_job.wait_for_state(goal_state="RUNNING")
763    old_pod_infos = stateless_job.query_pods()
764    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
765    update = StatelessUpdate(
766        stateless_job, batch_size=1, updated_job_file=UPDATE_STATELESS_JOB_SPEC
767    )
768    update.create(in_place=in_place)
769    # immediately pause the update, so the update may still be INITIALIZED
770    update.pause()
771    update.wait_for_state(goal_state="PAUSED")
772    update.resume()
773    update.wait_for_state(goal_state="SUCCEEDED")
774    new_pod_infos = stateless_job.query_pods()
775    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
776    assert_pod_id_changed(old_pod_infos, new_pod_infos)
777    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)
778
779
780# test__pause_resume_initialized_update test pause and resume an update
781def test__pause_resume__update(stateless_job, in_place):
782    stateless_job.create()
783    stateless_job.wait_for_state(goal_state="RUNNING")
784    old_pod_infos = stateless_job.query_pods()
785    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
786    update = StatelessUpdate(
787        stateless_job, batch_size=1, updated_job_file=UPDATE_STATELESS_JOB_SPEC
788    )
789    update.create(in_place=in_place)
790    # sleep for 1 sec so update can begin to roll forward
791    time.sleep(1)
792    update.pause()
793    update.wait_for_state(goal_state="PAUSED")
794    update.resume()
795    update.wait_for_state(goal_state="SUCCEEDED")
796    new_pod_infos = stateless_job.query_pods()
797    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
798    assert_pod_id_changed(old_pod_infos, new_pod_infos)
799    assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)
800
801
802# test_manual_rollback manually rolls back a running update when
803# the instance count is reduced in the rollback.
804# Note that manual rollback in peloton is just updating to the
805# previous job configuration
806def test_manual_rollback_reduce_instances(stateless_job, in_place):
807    stateless_job.create()
808    stateless_job.wait_for_state(goal_state="RUNNING")
809    old_pod_infos = stateless_job.query_pods()
810    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
811    update = StatelessUpdate(
812        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC
813    )
814    update.create(in_place=in_place)
815    # manually rollback the update
816    update2 = StatelessUpdate(
817        stateless_job,
818        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_REDUCE_INSTANCES_SPEC,
819    )
820    update2.create(in_place=in_place)
821    update2.wait_for_state(goal_state="SUCCEEDED")
822    new_pod_infos = stateless_job.query_pods()
823    assert len(old_pod_infos) == len(new_pod_infos)
824    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
825    assert_pod_spec_equal(old_instance_zero_spec, new_instance_zero_spec)
826
827
828# test_manual_rollback manually rolls back a running update when
829# the instance count is increased in the rollback
830def test_manual_rollback_increase_instances(stateless_job, in_place):
831    stateless_job.create()
832    stateless_job.wait_for_state(goal_state="RUNNING")
833    update = StatelessUpdate(
834        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC
835    )
836    update.create(in_place=in_place)
837    update.wait_for_state(goal_state="SUCCEEDED")
838    old_pod_infos = stateless_job.query_pods()
839    old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
840    # reduce instance count and then roll it back
841    update2 = StatelessUpdate(
842        stateless_job,
843        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_REDUCE_INSTANCES_SPEC,
844    )
845    update2.create(in_place=in_place)
846    update3 = StatelessUpdate(
847        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC
848    )
849    update3.create(in_place=in_place)
850    update3.wait_for_state(goal_state="SUCCEEDED")
851    new_pod_infos = stateless_job.query_pods()
852    assert len(old_pod_infos) == len(new_pod_infos)
853    new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()
854    assert_pod_spec_equal(old_instance_zero_spec, new_instance_zero_spec)
855
856
857# test_auto_rollback_reduce_instances
858#  rolls back a failed update when
859# the instance count is reduced in the rollback.
860def test_auto_rollback_reduce_instances(stateless_job, in_place):
861    stateless_job.create()
862    stateless_job.wait_for_state(goal_state="RUNNING")
863
864    job_spec_dump = load_test_config(
865        UPDATE_STATELESS_JOB_BAD_HEALTH_CHECK_SPEC
866    )
867    updated_job_spec = JobSpec()
868    json_format.ParseDict(job_spec_dump, updated_job_spec)
869
870    # increase the instance count
871    updated_job_spec.instance_count = stateless_job.job_spec.instance_count + 3
872
873    update = StatelessUpdate(
874        stateless_job,
875        updated_job_spec=updated_job_spec,
876        roll_back_on_failure=True,
877        max_instance_attempts=1,
878        max_failure_instances=1,
879        batch_size=1,
880    )
881    update.create(in_place=in_place)
882    update.wait_for_state(goal_state="ROLLED_BACK")
883    assert (
884        len(stateless_job.query_pods())
885        == stateless_job.job_spec.instance_count
886    )
887
888
889# test_update_create_failure_invalid_spec tests the
890# update create failure due to invalid spec in request
891def test_update_create_failure_invalid_spec(stateless_job, in_place):
892    stateless_job.create()
893    stateless_job.wait_for_state(goal_state="RUNNING")
894
895    update = StatelessUpdate(
896        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_INVALID_SPEC
897    )
898    try:
899        update.create(in_place=in_place)
900    except grpc.RpcError as e:
901        assert e.code() == grpc.StatusCode.INVALID_ARGUMENT
902        return
903    raise Exception("job spec validation error not received")
904
905
906# test_update_killed_job tests updating a killed job.
907# The job should be updated but still remain in killed state
908def test_update_killed_job(in_place):
909    job = StatelessJob(job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC)
910    job.create()
911    job.wait_for_state(goal_state="RUNNING")
912
913    job.stop()
914    job.wait_for_state(goal_state="KILLED")
915
916    update = StatelessUpdate(
917        job, updated_job_file=UPDATE_STATELESS_JOB_UPDATE_REDUCE_INSTANCES_SPEC
918    )
919    update.create(in_place=in_place)
920    update.wait_for_state(goal_state="SUCCEEDED")
921
922    assert job.get_spec().instance_count == 3
923    assert job.get_status().state == stateless_pb2.JOB_STATE_KILLED
924
925
926# test_start_job_with_active_update tests
927# starting a job with an active update
928def test_start_job_with_active_update(stateless_job, in_place):
929    stateless_job.create()
930    stateless_job.wait_for_state(goal_state="RUNNING")
931    assert len(stateless_job.query_pods()) == 3
932    stateless_job.stop()
933
934    update = StatelessUpdate(
935        stateless_job,
936        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,
937        batch_size=1,
938    )
939
940    update.create(in_place=in_place)
941    stateless_job.start()
942
943    update.wait_for_state(goal_state="SUCCEEDED")
944    stateless_job.wait_for_all_pods_running()
945    assert len(stateless_job.query_pods()) == 5
946
947
948# test_stop_running_job_with_active_update_add_instances tests
949# stopping a running job with an active update(add instances)
950def test_stop_running_job_with_active_update_add_instances(stateless_job, in_place):
951    stateless_job.create()
952    stateless_job.wait_for_state(goal_state="RUNNING")
953    assert len(stateless_job.query_pods()) == 3
954
955    update = StatelessUpdate(
956        stateless_job,
957        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,
958        batch_size=1,
959    )
960    update.create(in_place=in_place)
961    update.wait_for_state(goal_state="ROLLING_FORWARD")
962
963    stateless_job.stop()
964    update.wait_for_state(goal_state="SUCCEEDED")
965    assert stateless_job.get_spec().instance_count == 5
966
967
968# test_stop_running_job_with_active_update_remove_instances tests
969# stopping a running job with an active update(remove instances)
970def test_stop_running_job_with_active_update_remove_instances(in_place):
971    stateless_job = StatelessJob(
972        job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC
973    )
974    stateless_job.create()
975    stateless_job.wait_for_state(goal_state="RUNNING")
976    assert len(stateless_job.query_pods()) == 5
977
978    update = StatelessUpdate(
979        stateless_job,
980        updated_job_file=UPDATE_STATELESS_JOB_UPDATE_REDUCE_INSTANCES_SPEC,
981        batch_size=1,
982    )
983    update.create(in_place=in_place)
984    update.wait_for_state(goal_state="ROLLING_FORWARD")
985
986    stateless_job.stop()
987    update.wait_for_state(goal_state="SUCCEEDED")
988    assert stateless_job.get_spec().instance_count == 3
989
990
991# test_stop_running_job_with_active_update_same_instance_count tests stopping
992# a running job with an active update that doesn't change instance count
993def test_stop_running_job_with_active_update_same_instance_count(
994    stateless_job,
995    in_place
996):
997    stateless_job.create()
998    stateless_job.wait_for_state(goal_state="RUNNING")
999
1000    stateless_job.job_spec.default_spec.containers[
1001        0
1002    ].command.value = "sleep 100"
1003    update = StatelessUpdate(
1004        stateless_job,
1005        updated_job_spec=stateless_job.job_spec,
1006        max_failure_instances=1,
1007        max_instance_attempts=1,
1008    )
1009    update.create(in_place=in_place)
1010    stateless_job.stop()
1011    update.wait_for_state(goal_state="SUCCEEDED")
1012    assert stateless_job.get_spec().instance_count == 3
1013    assert (
1014        stateless_job.get_spec().default_spec.containers[0].command.value
1015        == "sleep 100"
1016    )
1017
1018
1019# test__create_update_before_job_fully_created creates an update
1020# right after a job is created. It tests the case that job can be
1021# updated before it is fully created
1022def test__create_update_before_job_fully_created(stateless_job, in_place):
1023    stateless_job.create()
1024    update = StatelessUpdate(
1025        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_SPEC
1026    )
1027    update.create(in_place=in_place)
1028    update.wait_for_state(goal_state="SUCCEEDED")
1029    assert (
1030        stateless_job.get_spec().default_spec.containers[0].command.value
1031        == "while :; do echo updated; sleep 10; done"
1032    )
1033
1034
1035# test__in_place_update_success_rate tests that in-place update
1036# should succeed when every daemon is in healthy state.
1037# It starts a job with 30 instances, and start the in-place update
1038# without batch size, then it tests if any pod is running on unexpected
1039# host.
1040# TODO: Re-enable k8s when it stops being flaky.
1041# @pytest.mark.k8s
1042def test__in_place_update_success_rate(stateless_job):
1043    stateless_job.job_spec.instance_count = 30
1044    stateless_job.create()
1045    stateless_job.wait_for_all_pods_running()
1046    old_pod_infos = stateless_job.query_pods()
1047
1048    job_spec_dump = load_test_config(update_stateless_job_spec())
1049    updated_job_spec = JobSpec()
1050    json_format.ParseDict(job_spec_dump, updated_job_spec)
1051
1052    updated_job_spec.instance_count = 30
1053    if minicluster_type() == "k8s":
1054        updated_job_spec.default_spec.containers[0].resource.mem_limit_mb = 0.1
1055
1056    update = StatelessUpdate(stateless_job,
1057                             updated_job_spec=updated_job_spec,
1058                             batch_size=0)
1059    update.create(in_place=True)
1060    update.wait_for_state(goal_state='SUCCEEDED')
1061
1062    new_pod_infos = stateless_job.query_pods()
1063
1064    old_pod_dict = {}
1065    new_pod_dict = {}
1066
1067    for old_pod_info in old_pod_infos:
1068        split_index = old_pod_info.status.pod_id.value.rfind('-')
1069        pod_name = old_pod_info.status.pod_id.value[:split_index]
1070        old_pod_dict[pod_name] = old_pod_info.status.host
1071
1072    for new_pod_info in new_pod_infos:
1073        split_index = new_pod_info.status.pod_id.value.rfind('-')
1074        pod_name = new_pod_info.status.pod_id.value[:split_index]
1075        new_pod_dict[pod_name] = new_pod_info.status.host
1076
1077    count = 0
1078    for pod_name, pod_id in old_pod_dict.items():
1079        if new_pod_dict[pod_name] != old_pod_dict[pod_name]:
1080            log.info("%s, prev:%s, cur:%s", pod_name,
1081                     old_pod_dict[pod_name], new_pod_dict[pod_name])
1082            count = count + 1
1083    log.info("total mismatch: %d", count)
1084    assert count == 0
1085
1086
1087# test__in_place_kill_job_release_host tests the case of killing
1088# an ongoing in-place update would release hosts, so the second
1089# update can get completed
1090def test__in_place_kill_job_release_host():
1091    job1 = StatelessJob(
1092        job_file="test_stateless_job_spec.yaml",
1093    )
1094    job1.create()
1095    job1.wait_for_state(goal_state="RUNNING")
1096
1097    job2 = StatelessJob(
1098        job_file="test_stateless_job_spec.yaml",
1099    )
1100    job2.create()
1101    job2.wait_for_state(goal_state="RUNNING")
1102
1103    update1 = StatelessUpdate(job1,
1104                              updated_job_file=UPDATE_STATELESS_JOB_SPEC,
1105                              batch_size=0)
1106    update1.create(in_place=True)
1107    # stop the update
1108    job1.stop()
1109
1110    update2 = StatelessUpdate(job2,
1111                              updated_job_file=UPDATE_STATELESS_JOB_SPEC,
1112                              batch_size=0)
1113    update2.create()
1114
1115    # both updates should complete
1116    update1.wait_for_state(goal_state="SUCCEEDED")
1117    update2.wait_for_state(goal_state="SUCCEEDED")
1118
1119
1120@pytest.mark.skip(reason="flaky test")
1121def test__in_place_update_host_maintenance(stateless_job, maintenance):
1122    # add enough instances so each host should have some tasks running
1123    stateless_job.job_spec.instance_count = 9
1124    # need extra retry attempts, since in-place update would need more time
1125    # to process given agent is put in maintenance mode
1126    stateless_job.config = IntegrationTestConfig(
1127        max_retry_attempts=300,
1128        pool_file='test_stateless_respool.yaml',
1129    ),
1130    stateless_job.create()
1131    stateless_job.wait_for_all_pods_running()
1132
1133    job_spec_dump = load_test_config(UPDATE_STATELESS_JOB_SPEC)
1134    updated_job_spec = JobSpec()
1135    json_format.ParseDict(job_spec_dump, updated_job_spec)
1136
1137    updated_job_spec.instance_count = 9
1138    update = StatelessUpdate(stateless_job,
1139                             updated_job_spec=updated_job_spec,
1140                             batch_size=0)
1141    update.create(in_place=True)
1142
1143    # Pick a host that is UP and start maintenance on it
1144    test_host = get_host_in_state(host_pb2.HOST_STATE_UP)
1145    resp = maintenance["start"]([test_host])
1146    assert resp
1147
1148    wait_for_host_state(test_host, host_pb2.HOST_STATE_DOWN)
1149    update.wait_for_state(goal_state="SUCCEEDED")
1150
1151
1152def test__update_with_sla_aware_host_maintenance(stateless_job, maintenance):
1153    """
1154    1. Create a stateless job with 3 instances.
1155    2. Create a job update to update the instance job with instance count 2,
1156    add host-limit-1 constraint and define sla with maximum_unavailable_instances=1
1157    3. Start host maintenance on one of the hosts
1158    4. The host should transition to DOWN and the update workflow should SUCCEED
1159    """
1160    stateless_job.create()
1161    stateless_job.wait_for_all_pods_running()
1162
1163    job_spec_dump = load_test_config('test_stateless_job_spec_sla.yaml')
1164    updated_job_spec = JobSpec()
1165    json_format.ParseDict(job_spec_dump, updated_job_spec)
1166    updated_job_spec.instance_count = 2
1167
1168    update = StatelessUpdate(stateless_job,
1169                             updated_job_spec=updated_job_spec,
1170                             batch_size=1)
1171    update.create()
1172
1173    # Pick a host that is UP and start maintenance on it
1174    test_host = get_host_in_state(host_pb2.HOST_STATE_UP)
1175    resp = maintenance["start"]([test_host])
1176    assert resp
1177
1178    update.wait_for_state(goal_state="SUCCEEDED")
1179    wait_for_host_state(test_host, host_pb2.HOST_STATE_DOWN)
1180
1181
1182def test__update_with_host_maintenance_and_agent_down(stateless_job, maintenance):
1183    """
1184    1. Create a large stateless job (that take up more than two-thirds of
1185       the cluster resources) with MaximumUnavailableInstances=2.
1186    2. Start host maintenance on one of the hosts (say A) having pods of the job.
1187       MaximumUnavailableInstances=2 ensures that not more than 2 pods are
1188       unavailable due to host maintenance at a time.
1189    3. Take down another host which has pods running on it. This will TASK_LOST
1190       to be sent for all pods on the host after 75 seconds.
1191    4. Start an update to modify the instance spec of one of the pods.
1192    5. Since TASK_LOST would cause the job SLA to be violated, instances on the
1193       host A should not be killed once LOST event is received. Verify that
1194       host A does not transition to DOWN.
1195    """
1196    stateless_job.job_spec.instance_count = 30
1197    stateless_job.job_spec.default_spec.containers[0].resource.cpu_limit = 0.3
1198    stateless_job.job_spec.sla.maximum_unavailable_instances = 2
1199    stateless_job.create()
1200    stateless_job.wait_for_all_pods_running()
1201
1202    hosts = [h.hostname for h in query_hosts([]).host_infos]
1203    host_to_task_count = get_host_to_task_count(hosts, stateless_job)
1204    sorted_hosts = [t[0] for t in sorted(
1205        host_to_task_count.items(), key=operator.itemgetter(1), reverse=True)]
1206
1207    # Pick a host that has pods running on it to start maintenance on it.
1208    test_host = sorted_hosts[0]
1209    # pick another host which has pods of the job to take down
1210    host_container = get_container([sorted_hosts[1]])
1211
1212    try:
1213        host_container.stop()
1214        maintenance["start"]([test_host])
1215
1216        stateless_job.job_spec.instance_spec[10].containers.extend(
1217            [pod_pb2.ContainerSpec(resource=pod_pb2.ResourceSpec(disk_limit_mb=20))])
1218        update = StatelessUpdate(stateless_job,
1219                                 updated_job_spec=stateless_job.job_spec,
1220                                 batch_size=0)
1221        update.create()
1222        update.wait_for_state(goal_state="SUCCEEDED")
1223
1224        stateless_job.stop()
1225
1226        wait_for_host_state(test_host, host_pb2.HOST_STATE_DOWN)
1227        assert False, 'Host should not transition to DOWN'
1228    except:
1229        assert is_host_in_state(test_host, host_pb2.HOST_STATE_DRAINING)
1230        pass
1231    finally:
1232        host_container.start()
1233
1234
1235def test__update_with_host_maintenance__bad_config(stateless_job, maintenance):
1236    """
1237    1. Create a stateless job with 6 instances. Wait for all instances to reach
1238       RUNNING state. This means that there is at least one host with 2 or more
1239       instances on it
1240    2. Start a bad job update with max failure tolerance of 1 and auto-rollback
1241       disabled.
1242    3. Start host maintenance on one of the hosts (say host A).
1243    4. Wait for the update to fail. There should be 2 instances unavailable.
1244    5. Since 2 instances are already unavailable and
1245       maximum_unavailable_instances=1, host maintenance should not proceed.
1246       Verify that the host A doesn't transition to DOWN.
1247    """
1248    stateless_job.job_spec.sla.maximum_unavailable_instances = 1
1249    stateless_job.job_spec.instance_count = 6
1250    stateless_job.create()
1251    stateless_job.wait_for_all_pods_running()
1252
1253    hosts = [h.hostname for h in query_hosts([]).host_infos]
1254    host_to_task_count = get_host_to_task_count(hosts, stateless_job)
1255    sorted_hosts = [t[0] for t in sorted(
1256        host_to_task_count.items(), key=operator.itemgetter(1), reverse=True)]
1257
1258    job_spec_dump = load_test_config(UPDATE_STATELESS_JOB_BAD_SPEC)
1259    updated_job_spec = JobSpec()
1260    json_format.ParseDict(job_spec_dump, updated_job_spec)
1261    updated_job_spec.instance_count = 6
1262    updated_job_spec.sla.maximum_unavailable_instances = 1
1263    update = StatelessUpdate(
1264        stateless_job,
1265        updated_job_spec=updated_job_spec,
1266        max_failure_instances=1,
1267        max_instance_attempts=1,
1268        batch_size=2,
1269    )
1270    update.create()
1271
1272    # Pick a host that has pods running on it to start maintenance on it.
1273    test_host = sorted_hosts[0]
1274    maintenance["start"]([test_host])
1275
1276    update.wait_for_state(goal_state="FAILED", failed_state="SUCCEEDED")
1277
1278    try:
1279        wait_for_host_state(test_host, host_pb2.HOST_STATE_DOWN)
1280        assert False, 'Host should not transition to DOWN'
1281    except:
1282        assert is_host_in_state(test_host, host_pb2.HOST_STATE_DRAINING)
1283
1284
1285# test__create_update_update_job_config tests update job level config
1286# would not trigger task restart
1287def test__create_update_update_job_config(stateless_job):
1288    stateless_job.create()
1289    stateless_job.wait_for_all_pods_running()
1290    old_pod_infos = stateless_job.query_pods()
1291    update = StatelessUpdate(
1292        stateless_job, updated_job_file=UPDATE_STATELESS_JOB_JOB_CONFIG_UPDATE_SPEC
1293    )
1294    update.create()
1295    update.wait_for_state(goal_state="SUCCEEDED")
1296    new_pod_infos = stateless_job.query_pods()
1297    assert_pod_id_equal(old_pod_infos, new_pod_infos)
1298
Full Screen

test_processdispatcher_service.py

Source: test_processdispatcher_service.py Github

copy
1# Copyright 2013 University of Chicago
2
3import logging
4import unittest
5from collections import defaultdict
6import random
7import time
8import uuid
9import threading
10from socket import timeout
11
12from mock import patch
13from dashi import bootstrap, DashiConnection
14
15import epu.tevent as tevent
16from epu.dashiproc.processdispatcher import ProcessDispatcherService, \
17    ProcessDispatcherClient, SubscriberNotifier
18from epu.processdispatcher.test.mocks import FakeEEAgent, MockEPUMClient, \
19    MockNotifier, get_definition, get_domain_config, nosystemrestart_process_config, \
20    minimum_time_between_starts_config
21from epu.processdispatcher.engines import EngineRegistry, domain_id_from_engine
22from epu.states import InstanceState, ProcessState
23from epu.processdispatcher.store import ProcessRecord, ProcessDispatcherStore, ProcessDispatcherZooKeeperStore
24from epu.processdispatcher.modes import QueueingMode, RestartMode
25from epu.test import ZooKeeperTestMixin
26from epu.test.util import wait
27
28
29log = logging.getLogger(__name__)
30
31
32class ProcessDispatcherServiceTests(unittest.TestCase):
33
34    amqp_uri = "amqp://guest:[email protected]//"
35
36    engine_conf = {'engine1': {'slots': 4, 'base_need': 1},
37                   'engine2': {'slots': 4}, 'engine3': {'slots': 4},
38                   'engine4': {
39                       'slots': 4, 'heartbeat_warning': 10, 'heartbeat_missing': 20,
40                       'heartbeat_period': 1, 'base_need': 1}}
41    default_engine = 'engine1'
42    process_engines = {'a.b.C': 'engine3', 'a.b': 'engine2'}
43
44    def setUp(self):
45
46        DashiConnection.consumer_timeout = 0.01
47        self.definition_id = "pd_definition"
48        self.definition = get_definition()
49        self.epum_client = MockEPUMClient()
50        self.epum_client.add_domain_definition(self.definition_id, self.definition)
51
52        self.store = self.setup_store()
53
54        self.sysname = "test-sysname%s" % uuid.uuid4().hex
55
56        self.start_pd()
57        self.client = ProcessDispatcherClient(self.pd.dashi, self.pd_name)
58
59        def waiter():
60            try:
61                self.client.list_definitions()
62                return True
63            except timeout:
64                return False
65
66        wait(waiter)
67
68        self.process_definition_id = uuid.uuid4().hex
69        self.client.create_definition(self.process_definition_id, "dtype",
70            executable={"module": "some.module", "class": "SomeClass"},
71            name="my_process")
72
73        self.eeagents = {}
74        self.eeagent_threads = {}
75
76    def tearDown(self):
77        self.stop_pd()
78        self.teardown_store()
79        self._kill_all_eeagents()
80
81    def start_pd(self):
82        self.registry = EngineRegistry.from_config(self.engine_conf,
83            default=self.default_engine, process_engines=self.process_engines)
84        self.notifier = MockNotifier()
85
86        self.pd = ProcessDispatcherService(amqp_uri=self.amqp_uri,
87            registry=self.registry, epum_client=self.epum_client,
88            notifier=self.notifier, definition_id=self.definition_id,
89            domain_config=get_domain_config(), store=self.store,
90            sysname=self.sysname)
91
92        self.pd_name = self.pd.topic
93        self.pd_thread = tevent.spawn(self.pd.start)
94        self.pd.ready_event.wait(60)
95
96    def stop_pd(self):
97        self.pd.stop()
98        self.pd_thread.join()
99
100    def setup_store(self):
101        return ProcessDispatcherStore()
102
103    def teardown_store(self):
104        self.store.shutdown()
105
106    def _spawn_eeagent(self, node_id, slot_count, heartbeat_dest=None):
107        if heartbeat_dest is None:
108            heartbeat_dest = self.pd_name
109
110        agent_name = "eeagent_%s" % uuid.uuid4()
111        dashi = bootstrap.dashi_connect(agent_name, sysname=self.sysname,
112                                        amqp_uri=self.amqp_uri)
113
114        agent = FakeEEAgent(agent_name, dashi, heartbeat_dest, node_id, slot_count)
115        self.eeagents[agent_name] = agent
116        self.eeagent_threads[agent_name] = tevent.spawn(agent.start)
117        agent.ready_event.wait(10)
118
119        agent.send_heartbeat()
120        return agent
121
122    def _kill_all_eeagents(self):
123        for eeagent in self.eeagents.itervalues():
124            eeagent.dashi.cancel()
125            eeagent.dashi.disconnect()
126
127        for eeagent_thread in self.eeagent_threads.itervalues():
128            eeagent_thread.join()
129
130        self.eeagents.clear()
131
132    def _get_eeagent_for_process(self, upid):
133        state = self.client.dump()
134        process = state['processes'][upid]
135
136        attached = process['assigned']
137        if attached is None:
138            return None
139
140        return self.eeagents[attached]
141
142    def _assert_pd_dump(self, fun, *args, **kwargs):
143        state = self.client.dump()
144        log.debug("PD state: %s", state)
145        fun(state, *args, **kwargs)
146
147    def assert_one_reconfigure(self, domain_id=None, preserve_n=None, retirees=None):
148        if domain_id is not None:
149            reconfigures = self.epum_client.reconfigures[domain_id]
150        else:
151            reconfigures = self.epum_client.reconfigures.values()
152            self.assertTrue(reconfigures)
153            reconfigures = reconfigures[0]
154        self.assertEqual(len(reconfigures), 1)
155        reconfigure = reconfigures[0]
156        engine_conf = reconfigure['engine_conf']
157        if preserve_n is not None:
158            self.assertEqual(engine_conf['preserve_n'], preserve_n)
159        if retirees is not None:
160            retirables = engine_conf.get('retirable_nodes', [])
161            self.assertEqual(set(retirables), set(retirees))
162
163    max_tries = 10
164
165    def _wait_assert_pd_dump(self, fun, *args, **kwargs):
166        tries = 0
167        while True:
168            try:
169                self._assert_pd_dump(fun, *args, **kwargs)
170            except Exception:
171                tries += 1
172                if tries == self.max_tries:
173                    log.error("PD state assertion failing after %d attempts",
174                              tries)
175                    raise
176            else:
177                return
178            time.sleep(0.05)
179
180    def _wait_for_one_reconfigure(self, domain_id=None,):
181        tries = 0
182        while True:
183            if domain_id is not None:
184                reconfigures = self.epum_client.reconfigures[domain_id]
185            else:
186                reconfigures = self.epum_client.reconfigures.values()
187                self.assertTrue(reconfigures)
188                reconfigures = reconfigures[0]
189
190            try:
191                self.assertEqual(len(reconfigures), 1)
192            except Exception:
193                tries += 1
194                if tries == self.max_tries:
195                    log.error("Waiting for reconfigure failing after %d attempts",
196                              tries)
197                    raise
198            else:
199                return
200            time.sleep(0.05)
201
202    def test_basics(self):
203
204        # create some fake nodes and tell PD about them
205        nodes = ["node1", "node2", "node3"]
206        domain_id = domain_id_from_engine("engine1")
207
208        for node in nodes:
209            self.client.node_state(node, domain_id, InstanceState.RUNNING)
210
211        # PD knows about these nodes but hasn't gotten a heartbeat yet
212
213        # spawn the eeagents and tell them all to heartbeat
214        for node in nodes:
215            self._spawn_eeagent(node, 4)
216
217        def assert_all_resources(state):
218            eeagent_nodes = set()
219            for resource in state['resources'].itervalues():
220                eeagent_nodes.add(resource['node_id'])
221            self.assertEqual(set(nodes), eeagent_nodes)
222
223        self._wait_assert_pd_dump(assert_all_resources)
224
225        procs = ["proc1", "proc2", "proc3"]
226        rounds = dict((upid, 0) for upid in procs)
227        for proc in procs:
228            procstate = self.client.schedule_process(proc,
229                self.process_definition_id, None, restart_mode=RestartMode.NEVER)
230            self.assertEqual(procstate['upid'], proc)
231
232        processes_left = 3
233
234        self._wait_assert_pd_dump(self._assert_process_distribution,
235                                  agent_counts=[processes_left])
236
237        # now terminate one process
238        todie = procs.pop()
239        procstate = self.client.terminate_process(todie)
240        self.assertEqual(procstate['upid'], todie)
241
242        processes_left = 2
243
244        self._wait_assert_pd_dump(self._assert_process_distribution,
245                                        agent_counts=[processes_left])
246
247        def assert_process_rounds(state):
248            for upid, expected_round in rounds.iteritems():
249                self.assertEqual(state['processes'][upid]['round'],
250                                 expected_round)
251
252        self._wait_assert_pd_dump(assert_process_rounds)
253
254        # "kill" a process in the backend eeagent
255        fail_upid = procs[0]
256        agent = self._get_eeagent_for_process(fail_upid)
257
258        agent.fail_process(fail_upid)
259
260        processes_left = 1
261
262        self._wait_assert_pd_dump(assert_process_rounds)
263        self._wait_assert_pd_dump(self._assert_process_distribution,
264                                  agent_counts=[processes_left])
265
266    def test_terminate_when_waiting(self):
267        """test_terminate_when_waiting
268        submit a proc, wait for it to get to a waiting state, epum
269        should then be configured to scale up, then terminate it.
270        EPUM should be reconfigured to scale down at that point.
271        """
272
273        domain_id = domain_id_from_engine("engine2")
274        self._wait_for_one_reconfigure(domain_id)
275        self.assert_one_reconfigure(domain_id, 0, [])
276        self.epum_client.clear()
277
278        procs = ["proc1", "proc2", "proc3"]
279        for proc in procs:
280            procstate = self.client.schedule_process(proc,
281                self.process_definition_id, None, restart_mode=RestartMode.NEVER,
282                execution_engine_id="engine2")
283            self.assertEqual(procstate['upid'], proc)
284
285        self._wait_assert_pd_dump(self._assert_process_distribution,
286                                  queued=procs)
287
288        self._wait_for_one_reconfigure(domain_id)
289        self.assert_one_reconfigure(domain_id, 1, [])
290        self.epum_client.clear()
291
292        # now terminate one process. Shouldn't have any reconfigs
293        todie = procs.pop()
294        procstate = self.client.terminate_process(todie)
295        self.assertEqual(procstate['upid'], todie)
296
297        self._wait_assert_pd_dump(self._assert_process_distribution,
298                                        queued=procs)
299
300        self.assertEqual(self.epum_client.reconfigures, {})
301
302        # now kill the remaining procs. we should see a scale down
303        for todie in procs:
304            procstate = self.client.terminate_process(todie)
305            self.assertEqual(procstate['upid'], todie)
306
307        self._wait_assert_pd_dump(self._assert_process_distribution,
308                                        queued=[])
309
310        self._wait_for_one_reconfigure(domain_id)
311        self.assert_one_reconfigure(domain_id, 0, [])
312        self.epum_client.clear()
313
314    def test_multiple_ee_per_node(self):
315
316        # create some fake nodes and tell PD about them
317        nodes = ["node1", "node2", "node3", "node4"]
318        domain_id = domain_id_from_engine("engine1")
319
320        for node in nodes:
321            self.client.node_state(node, domain_id, InstanceState.RUNNING)
322
323        # PD knows about these nodes but hasn't gotten a heartbeat yet
324
325        # spawn two eeagents per node and tell them all to heartbeat
326        for node in nodes:
327            self._spawn_eeagent(node, 4)
328            self._spawn_eeagent(node, 4)
329
330        def assert_all_resources(state):
331            eeagent_nodes = set()
332            for resource in state['resources'].itervalues():
333                eeagent_nodes.add(resource['node_id'])
334            self.assertEqual(set(nodes), eeagent_nodes)
335
336        self._wait_assert_pd_dump(assert_all_resources)
337
338        procs = ["proc%s" % i for i in range(33)]
339
340        def start_procs(n):
341            started = []
342            for proc in procs[:n]:
343                print "Starting proc %s" % proc
344                procstate = self.client.schedule_process(proc,
345                self.process_definition_id, None)
346                self.assertEqual(procstate['upid'], proc)
347                started.append(proc)
348            procs[:] = procs[n:]
349            return started
350
351        # start the first 5 processes. they should end up spread across the
352        # two eeagents on just one node
353        first_started = start_procs(5)
354
355        agent_dist = [2, 3, 0, 0, 0, 0, 0, 0]
356        node_dist = [5, 0, 0, 0]
357
358        self._wait_assert_pd_dump(self._assert_process_distribution,
359            agent_counts=agent_dist, node_counts=node_dist)
360
361        # start 3 more. should fill up that first node
362        start_procs(3)
363
364        agent_dist = [4, 4, 0, 0, 0, 0, 0, 0]
365        node_dist = [8, 0, 0, 0]
366        self._wait_assert_pd_dump(self._assert_process_distribution,
367            agent_counts=agent_dist, node_counts=node_dist)
368
369        # two more should cause us to spread to a second node with a process
370        # per eeagent
371        start_procs(2)
372        agent_dist = [4, 4, 1, 1, 0, 0, 0, 0]
373        node_dist = [8, 2, 0, 0]
374        self._wait_assert_pd_dump(self._assert_process_distribution,
375            agent_counts=agent_dist, node_counts=node_dist)
376
377        # now kill a process on the first node
378        self.client.terminate_process(first_started[0])
379        agent_dist = [3, 4, 1, 1, 0, 0, 0, 0]
380        node_dist = [7, 2, 0, 0]
381        self._wait_assert_pd_dump(self._assert_process_distribution,
382            agent_counts=agent_dist, node_counts=node_dist)
383
384        # and schedule two more. One new process should end up in the vacated
385        # slot. The other should go to the second node.
386        start_procs(2)
387        agent_dist = [4, 4, 2, 1, 0, 0, 0, 0]
388        node_dist = [8, 3, 0, 0]
389        self._wait_assert_pd_dump(self._assert_process_distribution,
390            agent_counts=agent_dist, node_counts=node_dist)
391
392        # finally start the remaining 24 processes. They should fill up
393        # all slots on all agents.
394        start_procs(24)
395        agent_dist = [4, 4, 4, 4, 4, 4, 4, 4]
396        node_dist = [8, 8, 8, 8]
397        self._wait_assert_pd_dump(self._assert_process_distribution,
398            agent_counts=agent_dist, node_counts=node_dist)
399
400    def test_requested_ee(self):
401        self.client.node_state("node1", domain_id_from_engine("engine1"),
402            InstanceState.RUNNING)
403        self._spawn_eeagent("node1", 4)
404
405        queued = []
406
407        proc1_queueing_mode = QueueingMode.ALWAYS
408
409        # ensure that procs that request nonexisting engine id get queued
410        self.client.schedule_process("proc1", self.process_definition_id,
411            queueing_mode=proc1_queueing_mode, execution_engine_id="engine2")
412
413        # proc1 should be queued
414        queued.append("proc1")
415        self._wait_assert_pd_dump(self._assert_process_distribution,
416                                        queued=queued)
417
418        # ensure that procs that request existing engine id run
419        self.client.schedule_process("proc2", self.process_definition_id,
420                queueing_mode=proc1_queueing_mode, execution_engine_id="engine1")
421
422        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)
423        self._wait_assert_pd_dump(self._assert_process_states,
424                ProcessState.RUNNING, ["proc2"])
425
426        # ensure that procs that don't specify an engine id run
427        self.client.schedule_process("proc3", self.process_definition_id,
428                queueing_mode=proc1_queueing_mode)
429
430        self.notifier.wait_for_state("proc3", ProcessState.RUNNING)
431        self._wait_assert_pd_dump(self._assert_process_states,
432                ProcessState.RUNNING, ["proc3"])
433
434        # now add an engine for proc1 and it should be scheduled
435        self.client.node_state("node2", domain_id_from_engine("engine2"),
436            InstanceState.RUNNING)
437        self._spawn_eeagent("node2", 4)
438
439        self.notifier.wait_for_state("proc1", ProcessState.RUNNING)
440        self._wait_assert_pd_dump(self._assert_process_states,
441            ProcessState.RUNNING, ["proc1"])
442
443        # now launch another process for engine2. it should be scheduled too
444        self.client.schedule_process("proc4", self.process_definition_id,
445            queueing_mode=QueueingMode.NEVER, execution_engine_id="engine2")
446        self.notifier.wait_for_state("proc4", ProcessState.RUNNING)
447        self._wait_assert_pd_dump(self._assert_process_states,
448            ProcessState.RUNNING, ["proc4"])
449
450    def test_default_ee(self):
451        self.client.node_state("node1", domain_id_from_engine("engine1"),
452            InstanceState.RUNNING)
453        self._spawn_eeagent("node1", 4)
454
455        self.client.node_state("node2", domain_id_from_engine("engine2"),
456            InstanceState.RUNNING)
457        self._spawn_eeagent("node2", 4)
458
459        # fill up all 4 slots on engine1 agent and launch one more proc
460        for upid in ['p1', 'p2', 'p3', 'p4', 'p5']:
461            self.client.schedule_process(upid, self.process_definition_id,
462                queueing_mode=QueueingMode.ALWAYS)
463
464        self.notifier.wait_for_state('p1', ProcessState.RUNNING)
465        self.notifier.wait_for_state('p2', ProcessState.RUNNING)
466        self.notifier.wait_for_state('p3', ProcessState.RUNNING)
467        self.notifier.wait_for_state('p4', ProcessState.RUNNING)
468
469        # p5 should be queued since it is not compatible with engine2
470        self.notifier.wait_for_state('p5', ProcessState.WAITING)
471
472        # now schedule p6 directly to engine2
473        self.client.schedule_process("p6", self.process_definition_id,
474            queueing_mode=QueueingMode.ALWAYS, execution_engine_id="engine2")
475        self.notifier.wait_for_state('p1', ProcessState.RUNNING)
476
477        # add another eeagent for engine1, p5 should run
478        self.client.node_state("node3", domain_id_from_engine("engine1"),
479            InstanceState.RUNNING)
480        self._spawn_eeagent("node3", 4)
481        self.notifier.wait_for_state('p5', ProcessState.RUNNING)
482
483    def test_process_engine_map(self):
484        def1 = uuid.uuid4().hex
485        self.client.create_definition(def1, "dtype",
486            executable={"module": "a.b", "class": "C"},
487            name="my_process")
488        def2 = uuid.uuid4().hex
489        self.client.create_definition(def2, "dtype",
490            executable={"module": "a.b", "class": "D"},
491            name="my_process")
492        def3 = uuid.uuid4().hex
493        self.client.create_definition(def3, "dtype",
494            executable={"module": "a", "class": "B"},
495            name="my_process")
496
497        self.client.node_state("node1", domain_id_from_engine("engine1"),
498            InstanceState.RUNNING)
499        eeagent1 = self._spawn_eeagent("node1", 4)
500
501        self.client.node_state("node2", domain_id_from_engine("engine2"),
502            InstanceState.RUNNING)
503        eeagent2 = self._spawn_eeagent("node2", 4)
504
505        self.client.node_state("node3", domain_id_from_engine("engine3"),
506            InstanceState.RUNNING)
507        eeagent3 = self._spawn_eeagent("node3", 4)
508
509        self.client.schedule_process("proc1", def1)
510        self.client.schedule_process("proc2", def2)
511        self.client.schedule_process("proc3", def3)
512
513        self.notifier.wait_for_state("proc1", ProcessState.RUNNING)
514        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)
515        self.notifier.wait_for_state("proc3", ProcessState.RUNNING)
516
517        proc1 = self.client.describe_process("proc1")
518        self.assertEqual(proc1['assigned'], eeagent3.name)
519
520        proc2 = self.client.describe_process("proc2")
521        self.assertEqual(proc2['assigned'], eeagent2.name)
522
523        proc3 = self.client.describe_process("proc3")
524        self.assertEqual(proc3['assigned'], eeagent1.name)
525
526    def test_node_exclusive(self):
527        node = "node1"
528        domain_id = domain_id_from_engine('engine1')
529        node_properties = dict(engine="fedora")
530        self.client.node_state(node, domain_id, InstanceState.RUNNING,
531                node_properties)
532
533        self._spawn_eeagent(node, 4)
534
535        exclusive_attr = "hamsandwich"
536        queued = []
537
538        proc1_queueing_mode = QueueingMode.ALWAYS
539
540        # Process should be scheduled, since no other procs have its
541        # exclusive attribute
542        self.client.schedule_process("proc1", self.process_definition_id,
543                queueing_mode=proc1_queueing_mode,
544                node_exclusive=exclusive_attr)
545
546        self.notifier.wait_for_state("proc1", ProcessState.RUNNING)
547        self._wait_assert_pd_dump(self._assert_process_states,
548                ProcessState.RUNNING, ["proc1"])
549
550        # Process should be queued, because proc1 has the same attribute
551        self.client.schedule_process("proc2", self.process_definition_id,
552                queueing_mode=proc1_queueing_mode,
553                node_exclusive=exclusive_attr)
554
555        queued.append("proc2")
556        self._wait_assert_pd_dump(self._assert_process_distribution,
557                                        queued=queued)
558
559        # Now kill the first process, and proc2 should run.
560        self.client.terminate_process("proc1")
561        queued.remove("proc2")
562        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)
563        self._wait_assert_pd_dump(self._assert_process_states,
564                ProcessState.RUNNING, ["proc2"])
565
566        # Process should be queued, because proc2 has the same attribute
567        self.client.schedule_process("proc3", self.process_definition_id,
568                queueing_mode=proc1_queueing_mode,
569                node_exclusive=exclusive_attr)
570
571        queued.append("proc3")
572        self._wait_assert_pd_dump(self._assert_process_distribution,
573                                        queued=queued)
574
575        # Process should be scheduled, since no other procs have its
576        # exclusive attribute
577        other_exclusive_attr = "hummussandwich"
578        self.client.schedule_process("proc4", self.process_definition_id,
579                queueing_mode=proc1_queueing_mode,
580                node_exclusive=other_exclusive_attr)
581
582        self.notifier.wait_for_state("proc4", ProcessState.RUNNING)
583        self._wait_assert_pd_dump(self._assert_process_states,
584                ProcessState.RUNNING, ["proc4"])
585
586        # Now that we've started another node, waiting node should start
587        node = "node2"
588        node_properties = dict(engine="fedora")
589        self.client.node_state(node, domain_id, InstanceState.RUNNING,
590                node_properties)
591
592        self._spawn_eeagent(node, 4)
593
594        self.notifier.wait_for_state("proc3", ProcessState.RUNNING)
595        self._wait_assert_pd_dump(self._assert_process_states,
596                ProcessState.RUNNING, ["proc3"])
597
598    def test_node_exclusive_bug(self):
599        slots = 2
600
601        node_1 = "node1"
602        domain_id = domain_id_from_engine('engine1')
603        node_properties = dict(engine="fedora")
604        self.client.node_state(node_1, domain_id, InstanceState.RUNNING,
605                node_properties)
606        self._spawn_eeagent(node_1, slots)
607
608        node_2 = "node2"
609        domain_id = domain_id_from_engine('engine1')
610        node_properties = dict(engine="fedora")
611        self.client.node_state(node_2, domain_id, InstanceState.RUNNING,
612                node_properties)
613        self._spawn_eeagent(node_2, slots)
614
615        node_3 = "node3"
616        domain_id = domain_id_from_engine('engine1')
617        node_properties = dict(engine="fedora")
618        self.client.node_state(node_3, domain_id, InstanceState.RUNNING,
619                node_properties)
620        self._spawn_eeagent(node_3, slots)
621
622        node_4 = "node4"
623        domain_id = domain_id_from_engine('engine1')
624        node_properties = dict(engine="fedora")
625        self.client.node_state(node_4, domain_id, InstanceState.RUNNING,
626                node_properties)
627        self._spawn_eeagent(node_4, slots)
628
629        pydap_xattr = "pydap"
630        service_gateway_xattr = "service_gateway"
631
632        queueing_mode = QueueingMode.START_ONLY
633
634        # Process should be scheduled, since no other procs have its
635        # exclusive attribute
636        pydap_xattr_procs = []
637        service_gateway_xattr_procs = []
638
639        proc_1 = "proc_1"
640        self.client.schedule_process(proc_1, self.process_definition_id,
641                queueing_mode=queueing_mode,
642                node_exclusive=pydap_xattr)
643        pydap_xattr_procs.append(proc_1)
644
645        proc_2 = "proc_2"
646        self.client.schedule_process(proc_2, self.process_definition_id,
647                queueing_mode=queueing_mode,
648                node_exclusive=service_gateway_xattr)
649        pydap_xattr_procs.append(proc_2)
650
651        proc_3 = "proc_3"
652        self.client.schedule_process(proc_3, self.process_definition_id,
653                queueing_mode=queueing_mode,
654                node_exclusive=service_gateway_xattr)
655        service_gateway_xattr_procs.append(proc_1)
656
657        proc_4 = "proc_4"
658        self.client.schedule_process(proc_4, self.process_definition_id,
659                queueing_mode=queueing_mode,
660                node_exclusive=pydap_xattr)
661        pydap_xattr_procs.append(proc_4)
662
663        proc_5 = "proc_5"
664        self.client.schedule_process(proc_5, self.process_definition_id,
665                queueing_mode=queueing_mode,
666                node_exclusive=service_gateway_xattr)
667        service_gateway_xattr_procs.append(proc_5)
668
669        proc_6 = "proc_6"
670        self.client.schedule_process(proc_6, self.process_definition_id,
671                queueing_mode=queueing_mode,
672                node_exclusive=pydap_xattr)
673        pydap_xattr_procs.append(proc_6)
674
675        proc_7 = "proc_7"
676        self.client.schedule_process(proc_7, self.process_definition_id,
677                queueing_mode=queueing_mode,
678                node_exclusive=service_gateway_xattr)
679        service_gateway_xattr_procs.append(proc_7)
680
681        proc_8 = "proc_8"
682        self.client.schedule_process(proc_8, self.process_definition_id,
683                queueing_mode=queueing_mode,
684                node_exclusive=pydap_xattr)
685        pydap_xattr_procs.append(proc_8)
686
687        for proc in (pydap_xattr_procs + service_gateway_xattr_procs):
688            self.notifier.wait_for_state(proc, ProcessState.RUNNING)
689            self._wait_assert_pd_dump(self._assert_process_states,
690                    ProcessState.RUNNING, [proc])
691
692        self._wait_assert_pd_dump(self._assert_node_exclusive)
693
694        self.client.terminate_process(proc_8)
695
696        self._wait_assert_pd_dump(self._assert_node_exclusive)
697
698    def test_node_exclusive_multiple_eeagents(self):
699        node = "node1"
700        domain_id = domain_id_from_engine('engine1')
701
702        node_properties = dict(engine="fedora")
703        self.client.node_state(node, domain_id, InstanceState.RUNNING,
704                node_properties)
705
706        self._spawn_eeagent(node, 4)
707        self._spawn_eeagent(node, 4)
708
709        exclusive_attr = "hamsandwich"
710        queued = []
711
712        proc1_queueing_mode = QueueingMode.ALWAYS
713
714        # Process should be scheduled, since no other procs have its
715        # exclusive attribute
716        self.client.schedule_process("proc1", self.process_definition_id,
717                queueing_mode=proc1_queueing_mode,
718                node_exclusive=exclusive_attr)
719
720        self.notifier.wait_for_state("proc1", ProcessState.RUNNING)
721        self._wait_assert_pd_dump(self._assert_process_states,
722                ProcessState.RUNNING, ["proc1"])
723
724        # Process should be queued, because proc1 has the same attribute
725        self.client.schedule_process("proc2", self.process_definition_id,
726                queueing_mode=proc1_queueing_mode,
727                node_exclusive=exclusive_attr)
728
729        queued.append("proc2")
730        self._wait_assert_pd_dump(self._assert_process_distribution,
731                                        queued=queued)
732
733        # Now kill the first process, and proc2 should run.
734        self.client.terminate_process("proc1")
735        queued.remove("proc2")
736        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)
737        self._wait_assert_pd_dump(self._assert_process_states,
738                ProcessState.RUNNING, ["proc2"])
739
740        # Process should be queued, because proc2 has the same attribute
741        self.client.schedule_process("proc3", self.process_definition_id,
742                queueing_mode=proc1_queueing_mode,
743                node_exclusive=exclusive_attr)
744
745        queued.append("proc3")
746        self._wait_assert_pd_dump(self._assert_process_distribution,
747                                        queued=queued)
748
749        # Process should be scheduled, since no other procs have its
750        # exclusive attribute
751        other_exclusive_attr = "hummussandwich"
752        self.client.schedule_process("proc4", self.process_definition_id,
753                queueing_mode=proc1_queueing_mode,
754                node_exclusive=other_exclusive_attr)
755
756        self.notifier.wait_for_state("proc4", ProcessState.RUNNING)
757        self._wait_assert_pd_dump(self._assert_process_states,
758                ProcessState.RUNNING, ["proc4"])
759
760        # Now that we've started another node, waiting node should start
761        node = "node2"
762        node_properties = dict(engine="fedora")
763        self.client.node_state(node, domain_id, InstanceState.RUNNING,
764                node_properties)
765
766        self._spawn_eeagent(node, 4)
767
768        self.notifier.wait_for_state("proc3", ProcessState.RUNNING)
769        self._wait_assert_pd_dump(self._assert_process_states,
770                ProcessState.RUNNING, ["proc3"])
771
772    def test_queueing(self):
773        # submit some processes before there are any resources available
774
775        procs = ["proc1", "proc2", "proc3", "proc4", "proc5"]
776        for proc in procs:
777            procstate = self.client.schedule_process(proc, self.process_definition_id)
778            self.assertEqual(procstate['upid'], proc)
779
780        for proc in procs:
781            self.notifier.wait_for_state(proc, ProcessState.WAITING)
782        self._wait_assert_pd_dump(self._assert_process_states,
783                                        ProcessState.WAITING, procs)
784
785        # add 2 nodes and a resource that supports 4 processes
786        nodes = ["node1", "node2"]
787        domain_id = domain_id_from_engine('engine1')
788
789        for node in nodes:
790            self.client.node_state(node, domain_id, InstanceState.RUNNING)
791
792        self._spawn_eeagent(nodes[0], 4)
793
794        for proc in procs[:4]:
795            self.notifier.wait_for_state(proc, ProcessState.RUNNING)
796        self._wait_assert_pd_dump(self._assert_process_states,
797                                        ProcessState.RUNNING, procs[:4])
798        for proc in procs[4:]:
799            self.notifier.wait_for_state(proc, ProcessState.WAITING)
800        self._wait_assert_pd_dump(self._assert_process_states,
801                                        ProcessState.WAITING, procs[4:])
802
803        # stand up a resource on the second node to support the other process
804        self._spawn_eeagent(nodes[1], 4)
805
806        # all processes should now be running
807        for proc in procs:
808            self.notifier.wait_for_state(proc, ProcessState.RUNNING)
809        self._wait_assert_pd_dump(self._assert_process_states,
810                                        ProcessState.RUNNING, procs)
811
812    def _assert_process_states(self, dump, expected_state, upids):
813        for upid in upids:
814            process = dump['processes'][upid]
815            assert process['state'] == expected_state, "%s: %s, expected %s!" % (
816                upid, process['state'], expected_state)
817
818    def test_node_death(self):
819        # set up two nodes with 4 slots each
820
821        nodes = ['node1', 'node2']
822        domain_id = domain_id_from_engine('engine1')
823
824        for node in nodes:
825            self.client.node_state(node, domain_id, InstanceState.RUNNING)
826
827        for node in nodes:
828            self._spawn_eeagent(node, 4)
829
830        # 8 total slots are available, schedule 6 processes
831
832        procs = ['proc' + str(i + 1) for i in range(6)]
833
834        # schedule the first process to never restart. it shouldn't come back.
835        self.client.schedule_process(procs[0], self.process_definition_id,
836            restart_mode=RestartMode.NEVER)
837
838        # and the second process to restart on abnormal termination. it should
839        # come back.
840        self.client.schedule_process(procs[1], self.process_definition_id,
841            restart_mode=RestartMode.ABNORMAL)
842
843        for proc in procs[2:]:
844            self.client.schedule_process(proc, self.process_definition_id)
845
846        self._wait_assert_pd_dump(self._assert_process_distribution,
847                                        node_counts=[4, 2],
848                                        queued_count=0)
849
850        # now kill one node
851        log.debug("killing node %s", nodes[0])
852        self.client.node_state(nodes[0], domain_id, InstanceState.TERMINATING)
853
854        # 5 procesess should be rescheduled. since we have 5 processes and only
855        # 4 slots, 1 should be queued
856
857        self._wait_assert_pd_dump(self._assert_process_distribution,
858                                  node_counts=[4],
859                                  queued_count=1)
860
861        # ensure that the correct process was not rescheduled
862        self.notifier.wait_for_state(procs[0], ProcessState.FAILED)
863
864    def _assert_process_distribution(self, dump, nodes=None, node_counts=None,
865                                     agents=None, agent_counts=None,
866                                     queued=None, queued_count=None,
867                                     rejected=None, rejected_count=None):
868        # Assert the distribution of processes among nodes
869        # node and agent counts are given as sequences of integers which are not
870        # specific to a named node. So specifying node_counts=[4,3] will match
871        # as long as you have 4 processes assigned to one node and 3 to another,
872        # regardless of the node name
873        found_rejected = set()
874        found_queued = set()
875        found_node = defaultdict(set)
876        found_assigned = defaultdict(set)
877        for process in dump['processes'].itervalues():
878            upid = process['upid']
879            assigned = process['assigned']
880
881            if process['state'] == ProcessState.WAITING:
882                found_queued.add(upid)
883            elif process['state'] == ProcessState.REJECTED:
884                found_rejected.add(upid)
885            elif process['state'] == ProcessState.RUNNING:
886                resource = dump['resources'].get(assigned)
887                self.assertIsNotNone(resource)
888                node_id = resource['node_id']
889                found_node[node_id].add(upid)
890                found_assigned[assigned].add(upid)
891
892        print "Queued: %s\nRejected: %s\n" % (queued, rejected)
893        print "Found Queued: %s\nFound Rejected: %s\n" % (found_queued, found_rejected)
894
895        if queued is not None:
896            self.assertEqual(set(queued), found_queued)
897
898        if queued_count is not None:
899            self.assertEqual(len(found_queued), queued_count)
900
901        if rejected is not None:
902            self.assertEqual(set(rejected), found_rejected)
903
904        if rejected_count is not None:
905            self.assertEqual(len(found_rejected), rejected_count)
906
907        if agents is not None:
908            self.assertEqual(set(agents.keys()), set(found_assigned.keys()))
909            for ee_id, processes in found_assigned.iteritems():
910                self.assertEqual(set(agents[ee_id]), processes)
911
912        if agent_counts is not None:
913            assigned_lengths = [len(s) for s in found_assigned.itervalues()]
914            # omit zero counts
915            agent_counts = [count for count in agent_counts if count != 0]
916            # print "%s =?= %s" % (agent_counts, assigned_lengths)
917            self.assertEqual(sorted(assigned_lengths), sorted(agent_counts))
918
919        if nodes is not None:
920            self.assertEqual(set(nodes.keys()), set(found_node.keys()))
921            for node_id, processes in found_node.iteritems():
922                self.assertEqual(set(nodes[node_id]), processes)
923
924        if node_counts is not None:
925            node_lengths = [len(s) for s in found_node.itervalues()]
926            # omit zero counts
927            node_counts = [count for count in node_counts if count != 0]
928            self.assertEqual(sorted(node_lengths), sorted(node_counts))
929
930    def _assert_node_exclusive(self, dump):
931        """assert that processes are distributed in a way consistent
932        with the node exclusive properties of those processes
933        """
934        exclusive_dist = {}
935        for proc_id, proc in dump['processes'].iteritems():
936            if proc['state'] == '700-TERMINATED':
937                continue
938            assigned = proc.get('assigned')
939            assert assigned is not None, proc
940
941            node_exclusive = proc.get('node_exclusive')
942            assert node_exclusive is not None
943
944            if exclusive_dist.get(assigned) is None:
945                exclusive_dist[assigned] = []
946            exclusive_dist[assigned].append(node_exclusive)
947            exclusive_dist[assigned].sort()
948
949        for node, exclusives in exclusive_dist.iteritems():
950            assert len(exclusives) == len(set(exclusives))
951
952        exclusive_dist_nodes = {}
953        exclusive_dist_resources = {}
954        for node_id, node in dump['nodes'].iteritems():
955            exclusive_dist_nodes[node_id] = node['node_exclusive']
956            exclusive_dist_nodes[node_id].sort()
957            for resource in node['resources']:
958                exclusive_dist_resources[resource] = node['node_exclusive']
959                exclusive_dist_resources[resource].sort()
960
961        for node, exclusives in exclusive_dist_nodes.iteritems():
962            assert len(exclusives) == len(set(exclusives))
963
964        print "nodes: %s" % exclusive_dist_nodes
965        print "resources: %s" % exclusive_dist_resources
966        print "proc: %s" % exclusive_dist
967        assert exclusive_dist == exclusive_dist_resources, "%s != %s" % (exclusive_dist, exclusive_dist_resources)
968        return exclusive_dist
969
970    def test_constraints(self):
971        nodes = ['node1', 'node2']
972        domain_id = domain_id_from_engine('engine1')
973        node1_properties = dict(hat_type="fedora")
974        node2_properties = dict(hat_type="bowler")
975
976        self.client.node_state(nodes[0], domain_id, InstanceState.RUNNING,
977            node1_properties)
978        self._spawn_eeagent(nodes[0], 4)
979
980        proc1_constraints = dict(hat_type="fedora")
981        proc2_constraints = dict(hat_type="bowler")
982
983        self.client.schedule_process("proc1", self.process_definition_id,
984            constraints=proc1_constraints)
985        self.client.schedule_process("proc2", self.process_definition_id,
986            constraints=proc2_constraints)
987
988        # proc1 should be running on the node/agent, proc2 queued
989        self._wait_assert_pd_dump(self._assert_process_distribution,
990                                        nodes=dict(node1=["proc1"]),
991                                        queued=["proc2"])
992
993        # launch another eeagent that supports proc2's engine_type
994        self.client.node_state(nodes[1], domain_id, InstanceState.RUNNING,
995            node2_properties)
996        self._spawn_eeagent(nodes[1], 4)
997
998        self._wait_assert_pd_dump(self._assert_process_distribution,
999                                        nodes=dict(node1=["proc1"],
1000                                                   node2=["proc2"]),
1001                                        queued=[])
1002
1003    def test_queue_mode(self):
1004
1005        constraints = dict(hat_type="fedora")
1006        queued = []
1007        rejected = []
1008
1009        # Test QueueingMode.NEVER
1010        proc1_queueing_mode = QueueingMode.NEVER
1011
1012        self.client.schedule_process("proc1", self.process_definition_id,
1013            constraints=constraints, queueing_mode=proc1_queueing_mode)
1014
1015        # proc1 should be rejected
1016        rejected.append("proc1")
1017        self._wait_assert_pd_dump(self._assert_process_distribution,
1018                                        rejected=rejected)
1019
1020        # Test QueueingMode.ALWAYS
1021        proc2_queueing_mode = QueueingMode.ALWAYS
1022
1023        self.client.schedule_process("proc2", self.process_definition_id,
1024            constraints=constraints, queueing_mode=proc2_queueing_mode)
1025
1026        # proc2 should be queued
1027        queued.append("proc2")
1028        self._wait_assert_pd_dump(self._assert_process_distribution,
1029                                        queued=queued)
1030
1031        # Test QueueingMode.START_ONLY
1032        proc3_queueing_mode = QueueingMode.START_ONLY
1033        proc3_restart_mode = RestartMode.ALWAYS
1034
1035        self.client.schedule_process("proc3", self.process_definition_id,
1036            constraints=constraints, queueing_mode=proc3_queueing_mode,
1037            restart_mode=proc3_restart_mode)
1038
1039        # proc3 should be queued, since its start_only
1040        queued.append("proc3")
1041        self._wait_assert_pd_dump(self._assert_process_distribution,
1042                                        queued=queued)
1043
1044        node = "node1"
1045        domain_id = domain_id_from_engine('engine1')
1046
1047        node_properties = dict(hat_type="fedora")
1048        self.client.node_state(node, domain_id, InstanceState.RUNNING,
1049                node_properties)
1050
1051        self._spawn_eeagent(node, 4)
1052
1053        # we created a node, so it should now run
1054        self.notifier.wait_for_state("proc3", ProcessState.RUNNING)
1055        self._wait_assert_pd_dump(self._assert_process_states,
1056                ProcessState.RUNNING, ["proc3"])
1057
1058        log.debug("killing node %s", node)
1059        self._kill_all_eeagents()
1060        self.client.node_state(node, domain_id, InstanceState.TERMINATING)
1061
1062        # proc3 should now be rejected, because its START_ONLY
1063        queued.remove("proc3")
1064        rejected.append("proc3")
1065        self._wait_assert_pd_dump(self._assert_process_distribution,
1066                                        rejected=rejected)
1067
1068        # Test QueueingMode.RESTART_ONLY
1069
1070        # First test that its rejected if it doesn't start right away
1071        proc4_queueing_mode = QueueingMode.RESTART_ONLY
1072        proc4_restart_mode = RestartMode.ALWAYS
1073
1074        self.client.schedule_process("proc4", self.process_definition_id,
1075            constraints=constraints, queueing_mode=proc4_queueing_mode,
1076            restart_mode=proc4_restart_mode)
1077
1078        # proc4 should be rejected, since its RESTART_ONLY
1079        rejected.append("proc4")
1080        self._wait_assert_pd_dump(self._assert_process_distribution,
1081                                        rejected=rejected)
1082
1083        # Second test that if a proc starts, it'll get queued after it fails
1084        proc5_queueing_mode = QueueingMode.RESTART_ONLY
1085        proc5_restart_mode = RestartMode.ALWAYS
1086
1087        # Start a node
1088        self.client.node_state(node, domain_id, InstanceState.RUNNING,
1089                node_properties)
1090        self._spawn_eeagent(node, 4)
1091
1092        self.client.schedule_process("proc5", self.process_definition_id,
1093            constraints=constraints, queueing_mode=proc5_queueing_mode,
1094            restart_mode=proc5_restart_mode)
1095
1096        self.notifier.wait_for_state("proc5", ProcessState.RUNNING)
1097        self._wait_assert_pd_dump(self._assert_process_states,
1098                ProcessState.RUNNING, ["proc5"])
1099
1100        log.debug("killing node %s", node)
1101        self.client.node_state(node, domain_id, InstanceState.TERMINATING)
1102        self._kill_all_eeagents()
1103
1104        # proc5 should be queued, since its RESTART_ONLY
1105        queued.append("proc5")
1106        self._wait_assert_pd_dump(self._assert_process_distribution,
1107                                        queued=queued)
1108
1109    def test_restart_mode_never(self):
1110
1111        constraints = dict(hat_type="fedora")
1112
1113        # Start a node
1114        node = "node1"
1115        domain_id = domain_id_from_engine('engine1')
1116        node_properties = dict(hat_type="fedora")
1117        self.client.node_state(node, domain_id, InstanceState.RUNNING,
1118                node_properties)
1119        eeagent = self._spawn_eeagent(node, 4)
1120
1121        # Test RestartMode.NEVER
1122        proc1_queueing_mode = QueueingMode.ALWAYS
1123        proc1_restart_mode = RestartMode.NEVER
1124
1125        self.client.schedule_process("proc1", self.process_definition_id,
1126            constraints=constraints, queueing_mode=proc1_queueing_mode,
1127            restart_mode=proc1_restart_mode)
1128
1129        self.notifier.wait_for_state("proc1", ProcessState.RUNNING)
1130        self._wait_assert_pd_dump(self._assert_process_states,
1131                ProcessState.RUNNING, ["proc1"])
1132
1133        eeagent.fail_process("proc1")
1134
1135        self.notifier.wait_for_state("proc1", ProcessState.FAILED)
1136        self._wait_assert_pd_dump(self._assert_process_states,
1137                ProcessState.FAILED, ["proc1"])
1138
1139    def test_restart_mode_always(self):
1140
1141        constraints = dict(hat_type="fedora")
1142        queued = []
1143
1144        # Start a node
1145        node = "node1"
1146        domain_id = domain_id_from_engine('engine1')
1147        node_properties = dict(hat_type="fedora")
1148        self.client.node_state(node, domain_id, InstanceState.RUNNING,
1149                node_properties)
1150        eeagent = self._spawn_eeagent(node, 4)
1151
1152        # Test RestartMode.ALWAYS
1153        proc2_queueing_mode = QueueingMode.ALWAYS
1154        proc2_restart_mode = RestartMode.ALWAYS
1155
1156        self.client.schedule_process("proc2", self.process_definition_id,
1157            constraints=constraints, queueing_mode=proc2_queueing_mode,
1158            restart_mode=proc2_restart_mode, configuration={'process': {'minimum_time_between_starts': 0.1}})
1159
1160        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)
1161        self._wait_assert_pd_dump(self._assert_process_states,
1162                ProcessState.RUNNING, ["proc2"])
1163
1164        eeagent.exit_process("proc2")
1165
1166        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)
1167        self._wait_assert_pd_dump(self._assert_process_states,
1168                ProcessState.RUNNING, ["proc2"])
1169
1170        eeagent.fail_process("proc2")
1171
1172        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)
1173        self._wait_assert_pd_dump(self._assert_process_states,
1174                ProcessState.RUNNING, ["proc2"])
1175
1176        log.debug("killing node %s", node)
1177        self.client.node_state(node, domain_id, InstanceState.TERMINATING)
1178        self._kill_all_eeagents()
1179
1180        # proc2 should be queued, since there are no more resources
1181        queued.append("proc2")
1182        self._wait_assert_pd_dump(self._assert_process_distribution,
1183                                        queued=queued)
1184
1185    def test_restart_mode_abnormal(self):
1186
1187        constraints = dict(hat_type="fedora")
1188        queued = []
1189
1190        # Start a node
1191        node = "node1"
1192        domain_id = domain_id_from_engine('engine1')
1193        node_properties = dict(hat_type="fedora")
1194        self.client.node_state(node, domain_id, InstanceState.RUNNING,
1195                node_properties)
1196        eeagent = self._spawn_eeagent(node, 4)
1197
1198        # Test RestartMode.ABNORMAL
1199        proc2_queueing_mode = QueueingMode.ALWAYS
1200        proc2_restart_mode = RestartMode.ABNORMAL
1201
1202        self.client.schedule_process("proc2", self.process_definition_id,
1203            constraints=constraints, queueing_mode=proc2_queueing_mode,
1204            restart_mode=proc2_restart_mode)
1205
1206        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)
1207        self._wait_assert_pd_dump(self._assert_process_states,
1208                ProcessState.RUNNING, ["proc2"])
1209
1210        eeagent.fail_process("proc2")
1211
1212        # This can be very slow on buildbot, hence the long timeout
1213        self.notifier.wait_for_state("proc2", ProcessState.RUNNING, 60)
1214        self._wait_assert_pd_dump(self._assert_process_states,
1215                ProcessState.RUNNING, ["proc2"])
1216
1217        log.debug("killing node %s", node)
1218        self.client.node_state(node, domain_id, InstanceState.TERMINATING)
1219        self._kill_all_eeagents()
1220
1221        # proc2 should be queued, since there are no more resources
1222        queued.append("proc2")
1223        self._wait_assert_pd_dump(self._assert_process_distribution,
1224                                        queued=queued)
1225
1226        self.client.node_state(node, domain_id, InstanceState.RUNNING,
1227                node_properties)
1228        eeagent = self._spawn_eeagent(node, 4)
1229
1230        self.client.schedule_process("proc1", self.process_definition_id,
1231            constraints=constraints, queueing_mode=proc2_queueing_mode,
1232            restart_mode=proc2_restart_mode)
1233
1234        self.notifier.wait_for_state("proc1", ProcessState.RUNNING)
1235
1236        eeagent.exit_process("proc1")
1237
1238        self.notifier.wait_for_state("proc1", ProcessState.EXITED)
1239        self._wait_assert_pd_dump(self._assert_process_states,
1240                ProcessState.EXITED, ["proc1"])
1241
1242    def test_start_count(self):
1243
1244        nodes = ['node1']
1245        domain_id = domain_id_from_engine('engine1')
1246        node1_properties = dict(hat_type="fedora")
1247
1248        self.client.node_state(nodes[0], domain_id, InstanceState.RUNNING,
1249            node1_properties)
1250        self._spawn_eeagent(nodes[0], 4)
1251
1252        proc1_constraints = dict(hat_type="fedora")
1253
1254        self.client.schedule_process("proc1", self.process_definition_id,
1255            constraints=proc1_constraints)
1256
1257        # proc1 should be running on the node/agent, proc2 queued
1258        self._wait_assert_pd_dump(self._assert_process_distribution,
1259                                        nodes=dict(node1=["proc1"]))
1260
1261        proc = self.store.get_process(None, "proc1")
1262        self.assertEqual(proc.starts, 1)
1263        time.sleep(2)
1264
1265        self.client.restart_process("proc1")
1266        # proc1 should be running on the node/agent, proc2 queued
1267        self._wait_assert_pd_dump(self._assert_process_distribution,
1268                                        nodes=dict(node1=["proc1"]))
1269        proc = self.store.get_process(None, "proc1")
1270        self.assertEqual(proc.starts, 2)
1271
1272    def test_minimum_time_between_starts(self):
1273
1274        constraints = dict(hat_type="fedora")
1275
1276        # Start a node
1277        node = "node1"
1278        domain_id = domain_id_from_engine('engine1')
1279        node_properties = dict(hat_type="fedora")
1280        self.client.node_state(node, domain_id, InstanceState.RUNNING,
1281                node_properties)
1282        eeagent = self._spawn_eeagent(node, 4)
1283
1284        # Test RestartMode.ALWAYS
1285        queueing_mode = QueueingMode.ALWAYS
1286        restart_mode = RestartMode.ALWAYS
1287
1288        default_time_to_throttle = 2
1289        time_to_throttle = 10
1290
1291        self.client.schedule_process("proc1", self.process_definition_id,
1292            constraints=constraints, queueing_mode=queueing_mode,
1293            restart_mode=restart_mode)
1294
1295        self.client.schedule_process("proc2", self.process_definition_id,
1296            constraints=constraints, queueing_mode=queueing_mode,
1297            restart_mode=restart_mode,
1298            configuration=minimum_time_between_starts_config(time_to_throttle))
1299
1300        # Processes should start once without delay
1301        self.notifier.wait_for_state("proc1", ProcessState.RUNNING)
1302        self._wait_assert_pd_dump(self._assert_process_states,
1303                ProcessState.RUNNING, ["proc1"])
1304
1305        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)
1306        self._wait_assert_pd_dump(self._assert_process_states,
1307                ProcessState.RUNNING, ["proc2"])
1308
1309        # Processes should be restarted once without delay
1310        eeagent.exit_process("proc1")
1311        eeagent.exit_process("proc2")
1312
1313        self.notifier.wait_for_state("proc1", ProcessState.RUNNING)
1314        self._wait_assert_pd_dump(self._assert_process_states,
1315                ProcessState.RUNNING, ["proc1"])
1316        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)
1317        self._wait_assert_pd_dump(self._assert_process_states,
1318                ProcessState.RUNNING, ["proc2"])
1319
1320        # The second time proc1 should be throttled for 2s (the default), and
1321        # proc2 should be throttled for the configured 5s
1322        eeagent.exit_process("proc1")
1323        eeagent.exit_process("proc2")
1324
1325        self.notifier.wait_for_state("proc1", ProcessState.WAITING)
1326        self._wait_assert_pd_dump(self._assert_process_states,
1327                ProcessState.WAITING, ["proc1"])
1328        self.notifier.wait_for_state("proc2", ProcessState.WAITING)
1329        self._wait_assert_pd_dump(self._assert_process_states,
1330                ProcessState.WAITING, ["proc2"])
1331
1332        # After waiting a few seconds, proc1 should be restarted
1333        time.sleep(default_time_to_throttle + 1)
1334
1335        self.notifier.wait_for_state("proc1", ProcessState.RUNNING)
1336        self._wait_assert_pd_dump(self._assert_process_states,
1337                ProcessState.RUNNING, ["proc1"])
1338        self.notifier.wait_for_state("proc2", ProcessState.WAITING)
1339        self._wait_assert_pd_dump(self._assert_process_states,
1340                ProcessState.WAITING, ["proc2"])
1341
1342        # After a few more secs, proc2 should be restarted as well
1343        time.sleep(time_to_throttle - (default_time_to_throttle + 1) + 1)
1344
1345        self.notifier.wait_for_state("proc2", ProcessState.RUNNING)
1346        self._wait_assert_pd_dump(self._assert_process_states,
1347                ProcessState.RUNNING, ["proc2"])
1348
1349    def test_describe(self):
1350
1351        self.client.schedule_process("proc1", self.process_definition_id)
1352
1353        processes = self.client.describe_processes()
1354        self.assertEqual(len(processes), 1)
1355        self.assertEqual(processes[0]['upid'], "proc1")
1356
1357        proc1 = self.client.describe_process("proc1")
1358        self.assertEqual(proc1['upid'], "proc1")
1359
1360        self.client.schedule_process("proc2", self.process_definition_id)
1361
1362        processes = self.client.describe_processes()
1363        self.assertEqual(len(processes), 2)
1364
1365        if processes[0]['upid'] == "proc1":
1366            self.assertEqual(processes[1]['upid'], "proc2")
1367        elif processes[0]['upid'] == "proc2":
1368            self.assertEqual(processes[1]['upid'], "proc1")
1369        else:
1370            self.fail()
1371
1372        proc1 = self.client.describe_process("proc1")
1373        self.assertEqual(proc1['upid'], "proc1")
1374        proc2 = self.client.describe_process("proc2")
1375        self.assertEqual(proc2['upid'], "proc2")
1376
1377    def test_process_exited(self):
1378        node = "node1"
1379        domain_id = domain_id_from_engine('engine1')
1380        self.client.node_state(node, domain_id, InstanceState.RUNNING)
1381        self._spawn_eeagent(node, 1)
1382
1383        proc = "proc1"
1384
1385        self.client.schedule_process(proc, self.process_definition_id)
1386
1387        self._wait_assert_pd_dump(self._assert_process_states,
1388                                  ProcessState.RUNNING, [proc])
1389
1390        agent = self._get_eeagent_for_process(proc)
1391        agent.exit_process(proc)
1392        self._wait_assert_pd_dump(self._assert_process_states,
1393                                  ProcessState.EXITED, [proc])
1394        self.notifier.wait_for_state(proc, ProcessState.EXITED)
1395
1396    def test_neediness(self, process_count=20, node_count=5):
1397
1398        procs = ["proc" + str(i) for i in range(process_count)]
1399        for proc in procs:
1400            procstate = self.client.schedule_process(proc,
1401                self.process_definition_id)
1402            self.assertEqual(procstate['upid'], proc)
1403
1404        self._wait_assert_pd_dump(self._assert_process_states,
1405            ProcessState.WAITING, procs)
1406
1407        for i in range(3):
1408            # retry this a few times to avoid a race between processes
1409            # hitting WAITING state and the needs being registered
1410            try:
1411                self.epum_client.assert_needs(range(node_count + 1),
1412                    domain_id_from_engine("engine1"))
1413                break
1414            except AssertionError:
1415                time.sleep(0.01)
1416
1417        self.epum_client.clear()
1418
1419        # now provide nodes and resources, processes should start
1420        nodes = ["node" + str(i) for i in range(node_count)]
1421        domain_id = domain_id_from_engine('engine1')
1422        for node in nodes:
1423            self.client.node_state(node, domain_id, InstanceState.RUNNING)
1424
1425        for node in nodes:
1426            self._spawn_eeagent(node, 4)
1427
1428        self._wait_assert_pd_dump(self._assert_process_states,
1429            ProcessState.RUNNING, procs)
1430
1431        # now kill all processes in a random order
1432        killlist = list(procs)
1433        random.shuffle(killlist)
1434        for proc in killlist:
1435            self.client.terminate_process(proc)
1436
1437        self._wait_assert_pd_dump(self._assert_process_states,
1438            ProcessState.TERMINATED, procs)
1439
1440        for i in range(3):
1441            # retry this a few times to avoid a race between processes
1442            # hitting WAITING state and the needs being registered
1443            try:
1444                self.epum_client.assert_needs(range(node_count + 1),
1445                    domain_id_from_engine("engine1"))
1446                break
1447            except AssertionError:
1448                time.sleep(0.01)
1449
1450    def test_definitions(self):
1451        self.client.create_definition("d1", "t1", "notepad.exe")
1452
1453        d1 = self.client.describe_definition("d1")
1454        self.assertEqual(d1['definition_id'], "d1")
1455        self.assertEqual(d1['definition_type'], "t1")
1456        self.assertEqual(d1['executable'], "notepad.exe")
1457
1458        self.client.update_definition("d1", "t1", "notepad2.exe")
1459        d1 = self.client.describe_definition("d1")
1460        self.assertEqual(d1['executable'], "notepad2.exe")
1461
1462        d_list = self.client.list_definitions()
1463        self.assertIn("d1", d_list)
1464
1465        self.client.remove_definition("d1")
1466
1467    def test_reschedule_process(self):
1468        node = "node1"
1469        domain_id = domain_id_from_engine('engine1')
1470        self.client.node_state(node, domain_id, InstanceState.RUNNING)
1471        self._spawn_eeagent(node, 1)
1472
1473        proc = "proc1"
1474
1475        # start a process that is never restarted automatically.
1476        self.client.create_process(proc, self.process_definition_id)
1477        self.client.schedule_process(proc, restart_mode=RestartMode.NEVER)
1478
1479        self._wait_assert_pd_dump(self._assert_process_states,
1480                                  ProcessState.RUNNING, [proc])
1481
1482        agent = self._get_eeagent_for_process(proc)
1483        agent.exit_process(proc)
1484        self._wait_assert_pd_dump(self._assert_process_states,
1485                                  ProcessState.EXITED, [proc])
1486        self.notifier.wait_for_state(proc, ProcessState.EXITED)
1487
1488        record = self.client.schedule_process(proc)
1489        self.assertEqual(record['state'], ProcessState.REQUESTED)
1490
1491        self.notifier.wait_for_state(proc, ProcessState.RUNNING)
1492
1493        # now fail the process. it should still be restartable.
1494        agent.fail_process(proc)
1495
1496    def test_create_schedule(self):
1497        node = "node1"
1498        domain_id = domain_id_from_engine('engine1')
1499        self.client.node_state(node, domain_id, InstanceState.RUNNING)
1500        self._spawn_eeagent(node, 1)
1501
1502        proc = "proc1"
1503
1504        # create a process. it should be UNSCHEDULED until we schedule it
1505        self.client.create_process(proc, self.process_definition_id)
1506
1507        self._wait_assert_pd_dump(self._assert_process_states,
1508                                  ProcessState.UNSCHEDULED, [proc])
1509
1510        # creating again is harmless
1511        self.client.create_process(proc, self.process_definition_id)
1512
1513        # now schedule it
1514        self.client.schedule_process(proc)
1515        self._wait_assert_pd_dump(self._assert_process_states,
1516                                  ProcessState.RUNNING, [proc])
1517        self.notifier.wait_for_state(proc, ProcessState.RUNNING)
1518
1519        # scheduling again is harmless
1520        self.client.schedule_process(proc)
1521
1522    def test_restart_system_boot(self):
1523
1524        # set up some state in the PD before restart
1525        self.client.node_state("node1", domain_id_from_engine("engine1"),
1526            InstanceState.RUNNING)
1527        self._spawn_eeagent("node1", 4)
1528
1529        procs = [('p1', RestartMode.ABNORMAL, None),
1530                 ('p2', RestartMode.ALWAYS, None),
1531                 ('p3', RestartMode.NEVER, None),
1532                 ('p4', RestartMode.ALWAYS, nosystemrestart_process_config()),
1533                 ('p5', RestartMode.ABNORMAL, None),
1534                 ('p6', RestartMode.ABNORMAL, nosystemrestart_process_config())]
1535        # fill up all 4 slots on engine1 agent and launch 2 more procs
1536        for upid, restart_mode, config in procs:
1537            self.client.schedule_process(upid, self.process_definition_id,
1538                queueing_mode=QueueingMode.ALWAYS, restart_mode=restart_mode,
1539                configuration=config)
1540
1541        self.notifier.wait_for_state('p1', ProcessState.RUNNING)
1542        self.notifier.wait_for_state('p2', ProcessState.RUNNING)
1543        self.notifier.wait_for_state('p3', ProcessState.RUNNING)
1544        self.notifier.wait_for_state('p4', ProcessState.RUNNING)
1545
1546        self.notifier.wait_for_state('p5', ProcessState.WAITING)
1547        self.notifier.wait_for_state('p6', ProcessState.WAITING)
1548
1549        # now kill PD and eeagents. come back in system restart mode.
1550        self.stop_pd()
1551        self._kill_all_eeagents()
1552
1553        self.store.initialize()
1554        self.store.set_system_boot(True)
1555        self.store.shutdown()
1556
1557        self.start_pd()
1558        self.store.wait_initialized(timeout=20)
1559
1560        # some processes should come back pending. others should fail out
1561        # due to their restart mode flag.
1562        self.notifier.wait_for_state('p1', ProcessState.UNSCHEDULED_PENDING)
1563        self.notifier.wait_for_state('p2', ProcessState.UNSCHEDULED_PENDING)
1564        self.notifier.wait_for_state('p3', ProcessState.TERMINATED)
1565        self.notifier.wait_for_state('p4', ProcessState.TERMINATED)
1566        self.notifier.wait_for_state('p5', ProcessState.UNSCHEDULED_PENDING)
1567        self.notifier.wait_for_state('p6', ProcessState.TERMINATED)
1568
1569        # add resources back
1570        self.client.node_state("node1", domain_id_from_engine("engine1"),
1571            InstanceState.RUNNING)
1572        self._spawn_eeagent("node1", 4)
1573
1574        # now launch a new process to make sure scheduling still works during
1575        # system boot mode
1576        self.client.schedule_process("p7", self.process_definition_id)
1577
1578        # and restart a couple of the dead procs. one FAILED and one UNSCHEDULED_PENDING
1579        self.client.schedule_process("p1")
1580        self.client.schedule_process("p4")
1581
1582        self.notifier.wait_for_state('p1', ProcessState.RUNNING)
1583        self.notifier.wait_for_state('p4', ProcessState.RUNNING)
1584        self.notifier.wait_for_state('p7', ProcessState.RUNNING)
1585
1586        # finally, end system boot mode. the remaining 2 U-P procs should be scheduled
1587        self.client.set_system_boot(False)
1588        self._wait_assert_pd_dump(self._assert_process_distribution,
1589                                  node_counts=[4],
1590                                  queued_count=1)
1591
1592        # one process will end up queued. doesn't matter which
1593        p2 = self.client.describe_process("p2")
1594        p5 = self.client.describe_process("p5")
1595        states = set([p2['state'], p5['state']])
1596        self.assertEqual(states, set([ProcessState.RUNNING, ProcessState.WAITING]))
1597
1598    def test_missing_ee(self):
1599        """test_missing_ee
1600
1601        Ensure that the PD kills lingering processes on eeagents after they've been
1602        evacuated.
1603        """
1604
1605        # create some fake nodes and tell PD about them
1606        node_1 = "node1"
1607        domain_id = domain_id_from_engine("engine4")
1608        self.client.node_state(node_1, domain_id, InstanceState.RUNNING)
1609
1610        # PD knows about this node but hasn't gotten a heartbeat yet
1611
1612        # spawn the eeagents and tell them all to heartbeat
1613        eeagent_1 = self._spawn_eeagent(node_1, 1)
1614
1615        def assert_all_resources(state):
1616            eeagent_nodes = set()
1617            for resource in state['resources'].itervalues():
1618                eeagent_nodes.add(resource['node_id'])
1619            self.assertEqual(set([node_1]), eeagent_nodes)
1620
1621        self._wait_assert_pd_dump(assert_all_resources)
1622        time_to_throttle = 0
1623
1624        self.client.schedule_process("p1", self.process_definition_id, execution_engine_id="engine4",
1625            configuration=minimum_time_between_starts_config(time_to_throttle))
1626
1627        # Send a heartbeat to show the process is RUNNING, then wait for doctor
1628        # to mark the eeagent missing
1629        time.sleep(1)
1630        eeagent_1.send_heartbeat()
1631        self.notifier.wait_for_state('p1', ProcessState.RUNNING, timeout=30)
1632        self.notifier.wait_for_state('p1', ProcessState.WAITING, timeout=30)
1633
1634        # Check that process is still 'Running' on the eeagent, even the PD has
1635        # since marked it failed
1636        eeagent_process = eeagent_1._get_process_with_upid('p1')
1637        self.assertEqual(eeagent_process['u_pid'], 'p1')
1638        self.assertEqual(eeagent_process['state'], ProcessState.RUNNING)
1639        self.assertEqual(eeagent_process['round'], 0)
1640
1641        # Now send another heartbeat to start getting procs again
1642        eeagent_1.send_heartbeat()
1643        self.notifier.wait_for_state('p1', ProcessState.RUNNING, timeout=30)
1644
1645        eeagent_process = eeagent_1._get_process_with_upid('p1')
1646        self.assertEqual(eeagent_process['u_pid'], 'p1')
1647        self.assertEqual(eeagent_process['state'], ProcessState.RUNNING)
1648        self.assertEqual(eeagent_process['round'], 1)
1649
1650        # The pd should now have rescheduled the proc, and terminated the
1651        # lingering process
1652        self.assertEqual(len(eeagent_1.history), 1)
1653        terminated_history = eeagent_1.history[0]
1654        self.assertEqual(terminated_history['u_pid'], 'p1')
1655        self.assertEqual(terminated_history['state'], ProcessState.TERMINATED)
1656        self.assertEqual(terminated_history['round'], 0)
1657
1658    def test_matchmaker_msg_retry(self):
1659        node = "node1"
1660        domain_id = domain_id_from_engine('engine1')
1661        self.client.node_state(node, domain_id, InstanceState.RUNNING)
1662        self._spawn_eeagent(node, 1)
1663
1664        # sneak in and shorten retry time
1665        self.pd.matchmaker.process_launcher.retry_seconds = 0.5
1666
1667        proc1 = "proc1"
1668        proc2 = "proc2"
1669
1670        with patch.object(self.pd.matchmaker.process_launcher, "resource_client") as mock_resource_client:
1671            # first process request goes to mock, not real client but no error
1672            self.client.schedule_process(proc1, self.process_definition_id)
1673
1674            # second process hits an error but that should not cause problems
1675            mock_resource_client.launch_process.side_effect = Exception("boom!")
1676            self.client.schedule_process(proc2, self.process_definition_id)
1677
1678            self._wait_assert_pd_dump(self._assert_process_states,
1679                                      ProcessState.ASSIGNED, [proc1, proc2])
1680            self.assertEqual(mock_resource_client.launch_process.call_count, 2)
1681
1682        # now resource client works again. those messages should be retried
1683        self.notifier.wait_for_state(proc1, ProcessState.RUNNING)
1684        self.notifier.wait_for_state(proc2, ProcessState.RUNNING)
1685        self._wait_assert_pd_dump(self._assert_process_states,
1686                                  ProcessState.RUNNING, [proc1, proc2])
1687
1688
1689class ProcessDispatcherServiceZooKeeperTests(ProcessDispatcherServiceTests, ZooKeeperTestMixin):
1690
1691    # this runs all of the ProcessDispatcherService tests wih a ZK store
1692
1693    def setup_store(self):
1694
1695        self.setup_zookeeper(base_path_prefix="/processdispatcher_service_tests_")
1696        store = ProcessDispatcherZooKeeperStore(self.zk_hosts,
1697            self.zk_base_path, use_gevent=self.use_gevent)
1698        store.initialize()
1699        return store
1700
1701    def teardown_store(self):
1702        if self.store:
1703            self.store.shutdown()
1704        self.teardown_zookeeper()
1705
1706
1707class SubscriberNotifierTests(unittest.TestCase):
1708    amqp_uri = "memory://hello"
1709
1710    def setUp(self):
1711        self.condition = threading.Condition()
1712        self.process_states = []
1713
1714        DashiConnection.consumer_timeout = 0.01
1715        self.name = "SubscriberNotifierTests" + uuid.uuid4().hex
1716        self.dashi = DashiConnection(self.name, self.amqp_uri, self.name)
1717        self.dashi.handle(self.process_state)
1718
1719    def tearDown(self):
1720        self.dashi.cancel()
1721
1722    def process_state(self, process):
1723        with self.condition:
1724            self.process_states.append(process)
1725            self.condition.notify_all()
1726
1727    def test_notify_process(self):
1728        notifier = SubscriberNotifier(self.dashi)
1729
1730        p1 = ProcessRecord.new(None, "p1", {"blah": "blah"},
1731            ProcessState.RUNNING, subscribers=[(self.name, "process_state")])
1732
1733        notifier.notify_process(p1)
1734        self.dashi.consume(1, 1)
1735        self.assertEqual(len(self.process_states), 1)
1736        self.assertEqual(self.process_states[0]['upid'], "p1")
1737        self.assertEqual(self.process_states[0]['state'], ProcessState.RUNNING)
1738
1739        p2 = ProcessRecord.new(None, "p2", {"blah": "blah"},
1740            ProcessState.PENDING, subscribers=[(self.name, "process_state")])
1741        notifier.notify_process(p2)
1742        self.dashi.consume(1, 1)
1743        self.assertEqual(len(self.process_states), 2)
1744        self.assertEqual(self.process_states[1]['upid'], "p2")
1745        self.assertEqual(self.process_states[1]['state'], ProcessState.PENDING)
1746
1747
1748class RabbitSubscriberNotifierTests(SubscriberNotifierTests):
1749    amqp_uri = "amqp://guest:[email protected]//"
1750
Full Screen