How to use _live_migrate method in tempest

Best Python code snippet using tempest_python

test_server_migrations.py

Source:test_server_migrations.py Github

copy

Full Screen

1# Copyright 2016 OpenStack Foundation2# All Rights Reserved.3#4# Licensed under the Apache License, Version 2.0 (the "License"); you may5# not use this file except in compliance with the License. You may obtain6# a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the13# License for the specific language governing permissions and limitations14# under the License.15import datetime16import futurist17import mock18from nova.conductor import manager as conductor_manager19from nova import context20from nova.db import api as db21from nova import objects22from nova.tests.functional.api_sample_tests import test_servers23from nova.tests.unit import fake_instance24class ServerMigrationsSampleJsonTest(test_servers.ServersSampleBase):25 sample_dir = 'server-migrations'26 scenarios = [('v2_22', {'api_major_version': 'v2.1'})]27 microversion = '2.22'28 USE_NEUTRON = True29 def setUp(self):30 """setUp method for server usage."""31 super(ServerMigrationsSampleJsonTest, self).setUp()32 self.uuid = self._post_server()33 @mock.patch.object(conductor_manager.ComputeTaskManager, '_live_migrate')34 @mock.patch.object(db, 'service_get_by_compute_host')35 @mock.patch.object(objects.Migration, 'get_by_id_and_instance')36 @mock.patch('nova.compute.manager.ComputeManager.'37 'live_migration_force_complete')38 def test_live_migrate_force_complete(self, live_migration_pause_instance,39 get_by_id_and_instance,40 service_get_by_compute_host,41 _live_migrate):42 migration = objects.Migration()43 migration.id = 144 migration.status = 'running'45 migration.source_compute = self.compute.host46 get_by_id_and_instance.return_value = migration47 self._do_post('servers/%s/action' % self.uuid, 'live-migrate-server',48 {'hostname': self.compute.host})49 response = self._do_post('servers/%s/migrations/%s/action'50 % (self.uuid, '3'), 'force_complete', {})51 self.assertEqual(202, response.status_code)52 def test_get_migration(self):53 response = self._do_get('servers/fake_id/migrations/1234')54 self.assertEqual(404, response.status_code)55 def test_list_migrations(self):56 response = self._do_get('servers/fake_id/migrations')57 self.assertEqual(404, response.status_code)58class ServerMigrationsSamplesJsonTestV2_23(test_servers.ServersSampleBase):59 ADMIN_API = True60 sample_dir = "server-migrations"61 microversion = '2.23'62 scenarios = [('v2_23', {'api_major_version': 'v2.1'})]63 UUID_1 = '4cfba335-03d8-49b2-8c52-e69043d1e8fe'64 UUID_2 = '058fc419-a8a8-4e08-b62c-a9841ef9cd3f'65 fake_migrations = [66 {67 'source_node': 'node1',68 'dest_node': 'node2',69 'source_compute': 'compute1',70 'dest_compute': 'compute2',71 'dest_host': '1.2.3.4',72 'status': 'running',73 'instance_uuid': UUID_1,74 'migration_type': 'live-migration',75 'hidden': False,76 'memory_total': 123456,77 'memory_processed': 12345,78 'memory_remaining': 111111,79 'disk_total': 234567,80 'disk_processed': 23456,81 'disk_remaining': 211111,82 'created_at': datetime.datetime(2016, 0o1, 29, 13, 42, 2),83 'updated_at': datetime.datetime(2016, 0o1, 29, 13, 42, 2),84 'deleted_at': None,85 'deleted': False86 },87 {88 'source_node': 'node10',89 'dest_node': 'node20',90 'source_compute': 'compute10',91 'dest_compute': 'compute20',92 'dest_host': '5.6.7.8',93 'status': 'migrating',94 'instance_uuid': UUID_2,95 'migration_type': 'resize',96 'hidden': False,97 'memory_total': 456789,98 'memory_processed': 56789,99 'memory_remaining': 400000,100 'disk_total': 96789,101 'disk_processed': 6789,102 'disk_remaining': 90000,103 'created_at': datetime.datetime(2016, 0o1, 22, 13, 42, 2),104 'updated_at': datetime.datetime(2016, 0o1, 22, 13, 42, 2),105 'deleted_at': None,106 'deleted': False107 }108 ]109 def setUp(self):110 super(ServerMigrationsSamplesJsonTestV2_23, self).setUp()111 fake_context = context.RequestContext('fake', 'fake')112 self.mig1 = objects.Migration(113 context=fake_context, **self.fake_migrations[0])114 self.mig1.create()115 self.mig2 = objects.Migration(116 context=fake_context, **self.fake_migrations[1])117 self.mig2.create()118 fake_ins = fake_instance.fake_db_instance(uuid=self.UUID_1)119 fake_ins.pop("pci_devices")120 fake_ins.pop("security_groups")121 fake_ins.pop("services")122 fake_ins.pop("tags")123 fake_ins.pop("info_cache")124 fake_ins.pop("id")125 self.instance = objects.Instance(126 context=fake_context,127 **fake_ins)128 self.instance.create()129 def test_get_migration(self):130 response = self._do_get('servers/%s/migrations/%s' %131 (self.fake_migrations[0]["instance_uuid"],132 self.mig1.id))133 self.assertEqual(200, response.status_code)134 self._verify_response('migrations-get',135 {"server_uuid": self.UUID_1},136 response, 200)137 def test_list_migrations(self):138 response = self._do_get('servers/%s/migrations' %139 self.fake_migrations[0]["instance_uuid"])140 self.assertEqual(200, response.status_code)141 self._verify_response('migrations-index',142 {"server_uuid_1": self.UUID_1},143 response, 200)144class ServerMigrationsSampleJsonTestV2_24(test_servers.ServersSampleBase):145 ADMIN_API = True146 microversion = '2.24'147 sample_dir = "server-migrations"148 scenarios = [('v2_24', {'api_major_version': 'v2.1'})]149 USE_NEUTRON = True150 def setUp(self):151 """setUp method for server usage."""152 super(ServerMigrationsSampleJsonTestV2_24, self).setUp()153 self.uuid = self._post_server()154 self.context = context.RequestContext('fake', 'fake')155 fake_migration = {156 'source_node': self.compute.host,157 'dest_node': 'node10',158 'source_compute': 'compute1',159 'dest_compute': 'compute12',160 'migration_type': 'live-migration',161 'instance_uuid': self.uuid,162 'status': 'running'}163 self.migration = objects.Migration(context=self.context,164 **fake_migration)165 self.migration.create()166 @mock.patch.object(conductor_manager.ComputeTaskManager, '_live_migrate')167 def test_live_migrate_abort(self, _live_migrate):168 self._do_post('servers/%s/action' % self.uuid, 'live-migrate-server',169 {'hostname': self.compute.host})170 uri = 'servers/%s/migrations/%s' % (self.uuid, self.migration.id)171 response = self._do_delete(uri)172 self.assertEqual(202, response.status_code)173 @mock.patch.object(conductor_manager.ComputeTaskManager, '_live_migrate')174 def test_live_migrate_abort_migration_not_found(self, _live_migrate):175 self._do_post('servers/%s/action' % self.uuid, 'live-migrate-server',176 {'hostname': self.compute.host})177 uri = 'servers/%s/migrations/%s' % (self.uuid, '45')178 response = self._do_delete(uri)179 self.assertEqual(404, response.status_code)180 @mock.patch.object(conductor_manager.ComputeTaskManager, '_live_migrate')181 def test_live_migrate_abort_migration_not_running(self, _live_migrate):182 self.migration.status = 'completed'183 self.migration.save()184 self._do_post('servers/%s/action' % self.uuid, 'live-migrate-server',185 {'hostname': self.compute.host})186 uri = 'servers/%s/migrations/%s' % (self.uuid, self.migration.id)187 response = self._do_delete(uri)188 self.assertEqual(400, response.status_code)189class ServerMigrationsSamplesJsonTestV2_59(190 ServerMigrationsSamplesJsonTestV2_23191):192 ADMIN_API = True193 microversion = '2.59'194 scenarios = [('v2_59', {'api_major_version': 'v2.1'})]195 def setUp(self):196 # Add UUIDs to the fake migrations used in the tests.197 self.fake_migrations[0][198 'uuid'] = '12341d4b-346a-40d0-83c6-5f4f6892b650'199 self.fake_migrations[1][200 'uuid'] = '22341d4b-346a-40d0-83c6-5f4f6892b650'201 super(ServerMigrationsSamplesJsonTestV2_59, self).setUp()202class ServerMigrationsSampleJsonTestV2_65(ServerMigrationsSampleJsonTestV2_24):203 ADMIN_API = True204 microversion = '2.65'205 scenarios = [('v2_65', {'api_major_version': 'v2.1'})]206 USE_NEUTRON = True207 @mock.patch.object(conductor_manager.ComputeTaskManager, '_live_migrate')208 def test_live_migrate_abort_migration_queued(self, _live_migrate):209 self.migration.status = 'queued'210 self.migration.save()211 self._do_post('servers/%s/action' % self.uuid, 'live-migrate-server',212 {'hostname': self.compute.host})213 self.compute._waiting_live_migrations[self.uuid] = (214 self.migration, futurist.Future())215 uri = 'servers/%s/migrations/%s' % (self.uuid, self.migration.id)216 response = self._do_delete(uri)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful