How to use _create_workers method in Slash

Best Python code snippet using slash

test_db_worker_api.py

Source:test_db_worker_api.py Github

copy

Full Screen

...86 """Check basic non existent worker record get method."""87 db.worker_create(self.ctxt, **self.worker_fields)88 self.assertRaises(exception.WorkerNotFound, db.worker_get,89 self.ctxt, service_id='1', **self.worker_fields)90 def _create_workers(self, num, read_back=False, **fields):91 workers = []92 base_params = self.worker_fields.copy()93 base_params.update(fields)94 for i in range(num):95 params = base_params.copy()96 params['resource_id'] = self._uuid()97 workers.append(db.worker_create(self.ctxt, **params))98 if read_back:99 for i in range(len(workers)):100 workers[i] = db.worker_get(self.ctxt, id=workers[i].id)101 return workers102 def test_worker_get_all(self):103 """Test basic get_all method."""104 self._create_workers(1)105 service = db.service_create(self.ctxt, {})106 workers = self._create_workers(3, service_id=service.id)107 db_workers = db.worker_get_all(self.ctxt, service_id=service.id)108 self._assertEqualListsOfObjects(workers, db_workers)109 def test_worker_get_all_until(self):110 """Test get_all until a specific time."""111 workers = self._create_workers(3, read_back=True)112 timestamp = workers[-1].updated_at113 time.sleep(0.1)114 self._create_workers(3)115 db_workers = db.worker_get_all(self.ctxt, until=timestamp)116 self._assertEqualListsOfObjects(workers, db_workers)117 def test_worker_get_all_returns_empty(self):118 """Test that get_all returns an empty list when there's no results."""119 self._create_workers(3, deleted=True)120 db_workers = db.worker_get_all(self.ctxt)121 self.assertListEqual([], db_workers)122 def test_worker_update_not_exists(self):123 """Test worker update when the worker doesn't exist."""124 self.assertRaises(exception.WorkerNotFound, db.worker_update,125 self.ctxt, 1)126 def test_worker_update(self):127 """Test basic worker update."""128 worker = self._create_workers(1)[0]129 worker = db.worker_get(self.ctxt, id=worker.id)130 res = db.worker_update(self.ctxt, worker.id, service_id=1)131 self.assertEqual(1, res)132 worker.service_id = 1133 db_worker = db.worker_get(self.ctxt, id=worker.id)134 self._assertEqualObjects(worker, db_worker,135 ['updated_at', 'race_preventer'])136 self.assertEqual(worker.race_preventer + 1, db_worker.race_preventer)137 def test_worker_update_no_subsecond(self):138 """Test basic worker update."""139 db.sqlalchemy.api.DB_SUPPORTS_SUBSECOND_RESOLUTION = False140 worker = self._create_workers(1)[0]141 worker = db.worker_get(self.ctxt, id=worker.id)142 now = datetime.utcnow().replace(microsecond=123)143 with mock.patch('oslo_utils.timeutils.utcnow', return_value=now):144 res = db.worker_update(self.ctxt, worker.id, service_id=1)145 self.assertEqual(1, res)146 worker.service_id = 1147 db_worker = db.worker_get(self.ctxt, id=worker.id)148 self._assertEqualObjects(worker, db_worker,149 ['updated_at', 'race_preventer'])150 self.assertEqual(0, db_worker.updated_at.microsecond)151 self.assertEqual(worker.race_preventer + 1, db_worker.race_preventer)152 def test_worker_update_update_orm(self):153 """Test worker update updating the worker orm object."""154 worker = self._create_workers(1)[0]155 res = db.worker_update(self.ctxt, worker.id, orm_worker=worker,156 service_id=1)157 self.assertEqual(1, res)158 db_worker = db.worker_get(self.ctxt, id=worker.id)159 # If we are updating the ORM object we don't ignore the update_at field160 # because it will get updated in the ORM instance.161 self._assertEqualObjects(worker, db_worker)162 def test_worker_destroy(self):163 """Test that worker destroy really deletes the DB entry."""164 worker = self._create_workers(1)[0]165 res = db.worker_destroy(self.ctxt, id=worker.id)166 self.assertEqual(1, res)167 db_workers = db.worker_get_all(self.ctxt, read_deleted='yes')168 self.assertListEqual([], db_workers)169 def test_worker_destroy_non_existent(self):170 """Test that worker destroy returns 0 when entry doesn't exist."""171 res = db.worker_destroy(self.ctxt, id=100)172 self.assertEqual(0, res)173 def test_worker_claim(self):174 """Test worker claim of normal DB entry."""175 service_id = 1176 worker = db.worker_create(self.ctxt, resource_type='Volume',177 resource_id=fake.VOLUME_ID,178 status='deleting')...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful