How to use _create_aggregate method in tempest

Best Python code snippet using tempest_python

test_aggregate.py

Source:test_aggregate.py Github

copy

Full Screen

...45def _get_fake_metadata(db_id):46 return {'constant_key': 'constant_value',47 'unique_key': 'unique_value_' + str(db_id)}48@db_api.api_context_manager.writer49def _create_aggregate(context, values=_get_fake_aggregate(1, result=False),50 metadata=_get_fake_metadata(1)):51 aggregate = api_models.Aggregate()52 aggregate.update(values)53 aggregate.save(context.session)54 if metadata:55 for key, value in metadata.items():56 aggregate_metadata = api_models.AggregateMetadata()57 aggregate_metadata.update({'key': key,58 'value': value,59 'aggregate_id': aggregate['id']})60 aggregate_metadata.save(context.session)61 return aggregate62@db_api.api_context_manager.writer63def _create_aggregate_with_hosts(context,64 values=_get_fake_aggregate(1, result=False),65 metadata=_get_fake_metadata(1),66 hosts=_get_fake_hosts(1)):67 aggregate = _create_aggregate(context, values, metadata)68 for host in hosts:69 host_model = api_models.AggregateHost()70 host_model.update({'host': host,71 'aggregate_id': aggregate.id})72 host_model.save(context.session)73 return aggregate74@db_api.api_context_manager.reader75def _aggregate_host_get_all(context, aggregate_id):76 return context.session.query(api_models.AggregateHost).\77 filter_by(aggregate_id=aggregate_id).all()78@db_api.api_context_manager.reader79def _aggregate_metadata_get_all(context, aggregate_id):80 results = context.session.query(api_models.AggregateMetadata).\81 filter_by(aggregate_id=aggregate_id).all()82 metadata = {}83 for r in results:84 metadata[r['key']] = r['value']85 return metadata86class AggregateObjectDbTestCase(test.TestCase):87 def setUp(self):88 super(AggregateObjectDbTestCase, self).setUp()89 self.context = context.RequestContext('fake-user', 'fake-project')90 def test_in_api(self):91 ca1 = _create_aggregate(self.context, values={'name': 'fake_agg_1',92 'id': 1,93 'uuid': 'nonce'})94 ca2 = db.aggregate_create(self.context, {'name': 'fake_agg_2',95 'id': 2,96 'uuid': 'nonce'})97 api_db_agg = aggregate_obj.Aggregate.get_by_id(self.context, ca1['id'])98 cell_db_agg = aggregate_obj.Aggregate.get_by_id(99 self.context, ca2['id'])100 self.assertTrue(api_db_agg.in_api)101 self.assertFalse(cell_db_agg.in_api)102 def test_aggregate_get_from_db(self):103 result = _create_aggregate_with_hosts(self.context)104 expected = aggregate_obj._aggregate_get_from_db(self.context,105 result['id'])106 self.assertEqual(_get_fake_hosts(1), expected.hosts)107 self.assertEqual(_get_fake_metadata(1), expected['metadetails'])108 def test_aggregate_get_from_db_by_uuid(self):109 result = _create_aggregate_with_hosts(self.context)110 expected = aggregate_obj._aggregate_get_from_db_by_uuid(111 self.context, result['uuid'])112 self.assertEqual(result.uuid, expected.uuid)113 self.assertEqual(_get_fake_hosts(1), expected.hosts)114 self.assertEqual(_get_fake_metadata(1), expected['metadetails'])115 def test_aggregate_get_from_db_raise_not_found(self):116 aggregate_id = 5117 self.assertRaises(exception.AggregateNotFound,118 aggregate_obj._aggregate_get_from_db,119 self.context, aggregate_id)120 def test_aggregate_get_all_from_db(self):121 for c in range(3):122 _create_aggregate(self.context,123 values={'name': 'fake_aggregate_%d' % c})124 results = aggregate_obj._get_all_from_db(self.context)125 self.assertEqual(len(results), 3)126 def test_aggregate_get_by_host_from_db(self):127 _create_aggregate_with_hosts(self.context,128 values={'name': 'fake_aggregate_1'},129 hosts=['host.1.openstack.org'])130 _create_aggregate_with_hosts(self.context,131 values={'name': 'fake_aggregate_2'},132 hosts=['host.1.openstack.org'])133 _create_aggregate(self.context,134 values={'name': 'no_host_aggregate'})135 rh1 = aggregate_obj._get_all_from_db(self.context)136 rh2 = aggregate_obj._get_by_host_from_db(self.context,137 'host.1.openstack.org')138 self.assertEqual(3, len(rh1))139 self.assertEqual(2, len(rh2))140 def test_aggregate_get_by_host_with_key_from_db(self):141 ah1 = _create_aggregate_with_hosts(self.context,142 values={'name': 'fake_aggregate_1'},143 metadata={'goodkey': 'good'},144 hosts=['host.1.openstack.org'])145 _create_aggregate_with_hosts(self.context,146 values={'name': 'fake_aggregate_2'},147 hosts=['host.1.openstack.org'])148 rh1 = aggregate_obj._get_by_host_from_db(self.context,149 'host.1.openstack.org',150 key='goodkey')151 self.assertEqual(1, len(rh1))152 self.assertEqual(ah1['id'], rh1[0]['id'])153 def test_aggregate_get_by_metadata_key_from_db(self):154 _create_aggregate(self.context,155 values={'name': 'aggregate_1'},156 metadata={'goodkey': 'good'})157 _create_aggregate(self.context,158 values={'name': 'aggregate_2'},159 metadata={'goodkey': 'bad'})160 _create_aggregate(self.context,161 values={'name': 'aggregate_3'},162 metadata={'badkey': 'good'})163 rl1 = aggregate_obj._get_by_metadata_key_from_db(self.context,164 key='goodkey')165 self.assertEqual(2, len(rl1))166 def test_aggregate_create_in_db(self):167 fake_create_aggregate = {168 'name': 'fake-aggregate',169 }170 agg = aggregate_obj._aggregate_create_in_db(self.context,171 fake_create_aggregate)172 result = aggregate_obj._aggregate_get_from_db(self.context,173 agg.id)174 self.assertEqual(result.name, fake_create_aggregate['name'])175 def test_aggregate_create_in_db_with_metadata(self):176 fake_create_aggregate = {177 'name': 'fake-aggregate',178 }179 agg = aggregate_obj._aggregate_create_in_db(self.context,180 fake_create_aggregate,181 metadata={'goodkey': 'good'})182 result = aggregate_obj._aggregate_get_from_db(self.context,183 agg.id)184 md = aggregate_obj._get_by_metadata_key_from_db(self.context,185 key='goodkey')186 self.assertEqual(len(md), 1)187 self.assertEqual(md[0]['id'], agg.id)188 self.assertEqual(result.name, fake_create_aggregate['name'])189 def test_aggregate_create_raise_exist_exc(self):190 fake_create_aggregate = {191 'name': 'fake-aggregate',192 }193 aggregate_obj._aggregate_create_in_db(self.context,194 fake_create_aggregate)195 self.assertRaises(exception.AggregateNameExists,196 aggregate_obj._aggregate_create_in_db,197 self.context,198 fake_create_aggregate,199 metadata=None)200 def test_aggregate_delete(self):201 result = _create_aggregate(self.context, metadata=None)202 aggregate_obj._aggregate_delete_from_db(self.context, result['id'])203 self.assertRaises(exception.AggregateNotFound,204 aggregate_obj._aggregate_get_from_db,205 self.context, result['id'])206 def test_aggregate_delete_raise_not_found(self):207 # this does not exist!208 aggregate_id = 45209 self.assertRaises(exception.AggregateNotFound,210 aggregate_obj._aggregate_delete_from_db,211 self.context, aggregate_id)212 def test_aggregate_delete_with_metadata(self):213 result = _create_aggregate(self.context,214 metadata={'availability_zone': 'fake_avail_zone'})215 aggregate_obj._aggregate_delete_from_db(self.context, result['id'])216 self.assertRaises(exception.AggregateNotFound,217 aggregate_obj._aggregate_get_from_db,218 self.context, result['id'])219 def test_aggregate_update(self):220 created = _create_aggregate(self.context,221 metadata={'availability_zone': 'fake_avail_zone'})222 result = aggregate_obj._aggregate_get_from_db(self.context,223 created['id'])224 self.assertEqual('fake_avail_zone', result['availability_zone'])225 new_values = deepcopy(_get_fake_aggregate(1, result=False))226 new_values['availability_zone'] = 'different_avail_zone'227 updated = aggregate_obj._aggregate_update_to_db(self.context,228 result['id'], new_values)229 self.assertEqual('different_avail_zone', updated['availability_zone'])230 def test_aggregate_update_with_metadata(self):231 result = _create_aggregate(self.context, metadata=None)232 values = deepcopy(_get_fake_aggregate(1, result=False))233 values['metadata'] = deepcopy(_get_fake_metadata(1))234 values['availability_zone'] = 'different_avail_zone'235 expected_metadata = deepcopy(values['metadata'])236 expected_metadata['availability_zone'] = values['availability_zone']237 aggregate_obj._aggregate_update_to_db(self.context, result['id'],238 values)239 metadata = _aggregate_metadata_get_all(self.context, result['id'])240 updated = aggregate_obj._aggregate_get_from_db(self.context,241 result['id'])242 self.assertThat(metadata,243 matchers.DictMatches(expected_metadata))244 self.assertEqual('different_avail_zone', updated['availability_zone'])245 def test_aggregate_update_with_existing_metadata(self):246 result = _create_aggregate(self.context)247 values = deepcopy(_get_fake_aggregate(1, result=False))248 values['metadata'] = deepcopy(_get_fake_metadata(1))249 values['metadata']['fake_key1'] = 'foo'250 expected_metadata = deepcopy(values['metadata'])251 aggregate_obj._aggregate_update_to_db(self.context, result['id'],252 values)253 metadata = _aggregate_metadata_get_all(self.context, result['id'])254 self.assertThat(metadata, matchers.DictMatches(expected_metadata))255 def test_aggregate_update_zone_with_existing_metadata(self):256 result = _create_aggregate(self.context)257 new_zone = {'availability_zone': 'fake_avail_zone_2'}258 metadata = deepcopy(_get_fake_metadata(1))259 metadata.update(new_zone)260 aggregate_obj._aggregate_update_to_db(self.context, result['id'],261 new_zone)262 expected = _aggregate_metadata_get_all(self.context, result['id'])263 self.assertThat(metadata, matchers.DictMatches(expected))264 def test_aggregate_update_raise_not_found(self):265 # this does not exist!266 aggregate_id = 2267 new_values = deepcopy(_get_fake_aggregate(1, result=False))268 self.assertRaises(exception.AggregateNotFound,269 aggregate_obj._aggregate_update_to_db,270 self.context, aggregate_id, new_values)271 def test_aggregate_update_raise_name_exist(self):272 _create_aggregate(self.context, values={'name': 'test1'},273 metadata={'availability_zone': 'fake_avail_zone'})274 _create_aggregate(self.context, values={'name': 'test2'},275 metadata={'availability_zone': 'fake_avail_zone'})276 aggregate_id = 1277 new_values = {'name': 'test2'}278 self.assertRaises(exception.AggregateNameExists,279 aggregate_obj._aggregate_update_to_db,280 self.context, aggregate_id, new_values)281 def test_aggregate_host_add_to_db(self):282 result = _create_aggregate(self.context, metadata=None)283 host = _get_fake_hosts(1)[0]284 aggregate_obj._host_add_to_db(self.context, result['id'], host)285 expected = aggregate_obj._aggregate_get_from_db(self.context,286 result['id'])287 self.assertEqual([_get_fake_hosts(1)[0]], expected.hosts)288 def test_aggregate_host_re_add_to_db(self):289 result = _create_aggregate_with_hosts(self.context,290 metadata=None)291 host = _get_fake_hosts(1)[0]292 aggregate_obj._host_delete_from_db(self.context, result['id'], host)293 aggregate_obj._host_add_to_db(self.context, result['id'], host)294 expected = _aggregate_host_get_all(self.context, result['id'])295 self.assertEqual(len(expected), 2)296 def test_aggregate_host_add_to_db_duplicate_works(self):297 r1 = _create_aggregate_with_hosts(self.context,298 metadata=None)299 r2 = _create_aggregate_with_hosts(self.context,300 values={'name': 'fake_aggregate2'},301 metadata={'availability_zone': 'fake_avail_zone2'})302 h1 = _aggregate_host_get_all(self.context, r1['id'])303 self.assertEqual(len(h1), 2)304 self.assertEqual(r1['id'], h1[0]['aggregate_id'])305 h2 = _aggregate_host_get_all(self.context, r2['id'])306 self.assertEqual(len(h2), 2)307 self.assertEqual(r2['id'], h2[0]['aggregate_id'])308 def test_aggregate_host_add_to_db_duplicate_raise_exist_exc(self):309 result = _create_aggregate_with_hosts(self.context,310 metadata=None)311 self.assertRaises(exception.AggregateHostExists,312 aggregate_obj._host_add_to_db,313 self.context, result['id'],314 _get_fake_hosts(1)[0])315 def test_aggregate_host_add_to_db_raise_not_found(self):316 # this does not exist!317 aggregate_id = 1318 host = _get_fake_hosts(1)[0]319 self.assertRaises(exception.AggregateNotFound,320 aggregate_obj._host_add_to_db,321 self.context, aggregate_id, host)322 def test_aggregate_host_delete_from_db(self):323 result = _create_aggregate_with_hosts(self.context,324 metadata=None)325 aggregate_obj._host_delete_from_db(self.context, result['id'],326 _get_fake_hosts(1)[0])327 expected = _aggregate_host_get_all(self.context, result['id'])328 self.assertEqual(len(expected), 1)329 def test_aggregate_host_delete_from_db_raise_not_found(self):330 result = _create_aggregate(self.context)331 self.assertRaises(exception.AggregateHostNotFound,332 aggregate_obj._host_delete_from_db,333 self.context, result['id'],334 _get_fake_hosts(1)[0])335 def test_aggregate_metadata_add(self):336 result = _create_aggregate(self.context, metadata=None)337 metadata = deepcopy(_get_fake_metadata(1))338 aggregate_obj._metadata_add_to_db(self.context, result['id'], metadata)339 expected = _aggregate_metadata_get_all(self.context, result['id'])340 self.assertThat(metadata, matchers.DictMatches(expected))341 def test_aggregate_metadata_add_empty_metadata(self):342 result = _create_aggregate(self.context, metadata=None)343 metadata = {}344 aggregate_obj._metadata_add_to_db(self.context, result['id'], metadata)345 expected = _aggregate_metadata_get_all(self.context, result['id'])346 self.assertThat(metadata, matchers.DictMatches(expected))347 def test_aggregate_metadata_add_and_update(self):348 result = _create_aggregate(self.context)349 metadata = deepcopy(_get_fake_metadata(1))350 key = list(metadata.keys())[0]351 new_metadata = {key: 'foo',352 'fake_new_key': 'fake_new_value'}353 metadata.update(new_metadata)354 aggregate_obj._metadata_add_to_db(self.context,355 result['id'], new_metadata)356 expected = _aggregate_metadata_get_all(self.context, result['id'])357 self.assertThat(metadata, matchers.DictMatches(expected))358 def test_aggregate_metadata_add_retry(self):359 result = _create_aggregate(self.context, metadata=None)360 with mock.patch('nova.db.sqlalchemy.api_models.'361 'AggregateMetadata.__table__.insert') as insert_mock:362 insert_mock.side_effect = db_exc.DBDuplicateEntry363 self.assertRaises(db_exc.DBDuplicateEntry,364 aggregate_obj._metadata_add_to_db,365 self.context,366 result['id'],367 {'fake_key2': 'fake_value2'},368 max_retries=5)369 def test_aggregate_metadata_update(self):370 result = _create_aggregate(self.context)371 metadata = deepcopy(_get_fake_metadata(1))372 key = list(metadata.keys())[0]373 aggregate_obj._metadata_delete_from_db(self.context, result['id'], key)374 new_metadata = {key: 'foo'}375 aggregate_obj._metadata_add_to_db(self.context,376 result['id'], new_metadata)377 expected = _aggregate_metadata_get_all(self.context, result['id'])378 metadata[key] = 'foo'379 self.assertThat(metadata, matchers.DictMatches(expected))380 def test_aggregate_metadata_delete(self):381 result = _create_aggregate(self.context, metadata=None)382 metadata = deepcopy(_get_fake_metadata(1))383 aggregate_obj._metadata_add_to_db(self.context, result['id'], metadata)384 aggregate_obj._metadata_delete_from_db(self.context, result['id'],385 list(metadata.keys())[0])386 expected = _aggregate_metadata_get_all(self.context, result['id'])387 del metadata[list(metadata.keys())[0]]388 self.assertThat(metadata, matchers.DictMatches(expected))389 def test_aggregate_remove_availability_zone(self):390 result = _create_aggregate(self.context, metadata={'availability_zone':391 'fake_avail_zone'})392 aggregate_obj._metadata_delete_from_db(self.context,393 result['id'],394 'availability_zone')395 aggr = aggregate_obj._aggregate_get_from_db(self.context, result['id'])396 self.assertIsNone(aggr['availability_zone'])397 def test_aggregate_metadata_delete_raise_not_found(self):398 result = _create_aggregate(self.context)399 self.assertRaises(exception.AggregateMetadataNotFound,400 aggregate_obj._metadata_delete_from_db,401 self.context, result['id'], 'foo_key')402def create_aggregate(context, db_id, in_api=True):403 if in_api:404 fake_aggregate = _get_fake_aggregate(db_id, in_api=False, result=False)405 aggregate_obj._aggregate_create_in_db(context, fake_aggregate,406 metadata=_get_fake_metadata(db_id))407 for host in _get_fake_hosts(db_id):408 aggregate_obj._host_add_to_db(context, fake_aggregate['id'], host)409 else:410 fake_aggregate = _get_fake_aggregate(db_id, in_api=False, result=False)411 db.aggregate_create(context, fake_aggregate,412 metadata=_get_fake_metadata(db_id))...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful