How to use insert_job method in autotest

Best Python code snippet using autotest_python

test_banning.py

Source:test_banning.py Github

copy

Full Screen

...4from fts3rest.model.meta import Session5from fts3rest.tests import TestController6from datetime import datetime, timedelta7import uuid8def insert_job(vo, source=None, destination=None, state="SUBMITTED", **kwargs):9 job = Job()10 job.user_dn = kwargs.get("user_dn", "/DC=ch/DC=cern/CN=Test User")11 job.vo_name = vo12 job.source_se = source13 job.dest_se = destination14 job.job_state = state15 job.submit_time = datetime.utcnow()16 duration = kwargs.get("duration", 0)17 queued = kwargs.get("queued", 0)18 if duration and queued:19 job.finish_time = job.submit_time + timedelta(seconds=duration + queued)20 elif duration:21 job.finish_time = job.submit_time + timedelta(seconds=duration)22 job.job_id = str(uuid.uuid4())23 Session.merge(job)24 multiple = kwargs.get("multiple", [(source, destination)])25 for source_se, dest_se in multiple:26 transfer = File()27 transfer.job_id = job.job_id28 transfer.vo_name = vo29 transfer.source_se = source_se30 transfer.source_surl = source_se + "/path"31 transfer.dest_se = dest_se32 transfer.dest_surl = dest_se + "/path"33 transfer.file_state = state34 if queued:35 transfer.start_time = job.submit_time + timedelta(seconds=queued)36 if duration:37 transfer.tx_duration = duration38 transfer.reason = kwargs.get("reason", None)39 transfer.throughput = kwargs.get("thr", None)40 Session.merge(transfer)41 Session.commit()42 return job.job_id43class TestBanning(TestController):44 """45 Tests for user and storage banning46 """47 def setUp(self):48 super().setUp()49 self.setup_gridsite_environment()50 def tearDown(self):51 Session.query(BannedDN).delete()52 Session.query(BannedSE).delete()53 super().tearDown()54 def test_ban_dn(self):55 """56 Just ban a DN and unban it, make sure changes go into the DB57 """58 canceled = self.app.post(59 url="/ban/dn",60 params={"user_dn": "/DC=cern/CN=someone", "message": "TEST BAN"},61 status=200,62 ).json63 self.assertEqual(0, len(canceled))64 banned = Session.query(BannedDN).get("/DC=cern/CN=someone")65 self.assertNotEqual(None, banned)66 self.assertEqual(self.get_user_credentials().user_dn, banned.admin_dn)67 self.assertEqual("TEST BAN", banned.message)68 self.app.delete(69 url="/ban/dn?user_dn=%s" % quote("/DC=cern/CN=someone"), status=20470 )71 banned = Session.query(BannedDN).get("/DC=cern/CN=someone")72 self.assertEqual(None, banned)73 def test_list_banned_dns(self):74 """75 Ban a DN and make sure it is in the list76 """77 canceled = self.app.post(78 url="/ban/dn", params={"user_dn": "/DC=cern/CN=someone"}, status=20079 ).json80 self.assertEqual(0, len(canceled))81 banned = self.app.get(url="/ban/dn", status=200).json82 self.assertIn("/DC=cern/CN=someone", [b["dn"] for b in banned])83 self.app.delete(84 url="/ban/dn?user_dn=%s" % quote("/DC=cern/CN=someone"), status=20485 )86 banned = self.app.get(url="/ban/dn", status=200).json87 self.assertNotIn("/DC=cern/CN=someone", [b["dn"] for b in banned])88 def test_ban_dn_submission(self):89 """90 If a DN is banned, submissions from this user must not be accepted91 """92 banned = BannedDN()93 banned.dn = self.get_user_credentials().user_dn94 Session.merge(banned)95 Session.commit()96 self.push_delegation()97 self.app.post(98 url="/jobs", content_type="application/json", params="[]", status=40399 )100 def test_ban_self(self):101 """102 A user can not ban (him|her)self103 """104 user_dn = self.get_user_credentials().user_dn105 self.app.post(url="/ban/dn", params={"user_dn": user_dn}, status=409)106 def test_ban_dn_cancel(self):107 """108 Ban a DN that has transfers running, make sure they are canceled109 """110 jobs = list()111 jobs.append(112 insert_job(113 "testvo",114 "gsiftp://source",115 "gsiftp://destination",116 "SUBMITTED",117 user_dn="/DC=cern/CN=someone",118 )119 )120 jobs.append(121 insert_job(122 "testvo",123 "gsiftp://source",124 "gsiftp://destination2",125 "ACTIVE",126 user_dn="/DC=cern/CN=someone",127 )128 )129 jobs.append(130 insert_job(131 "testvo",132 "gsiftp://source",133 "gsiftp://destination2",134 "FAILED",135 duration=10,136 queued=20,137 user_dn="/DC=cern/CN=someone",138 )139 )140 canceled_ids = self.app.post(141 url="/ban/dn", params={"user_dn": "/DC=cern/CN=someone"}, status=200142 ).json143 self.assertEqual(2, len(canceled_ids))144 self.assertIn(jobs[0], canceled_ids)145 self.assertIn(jobs[1], canceled_ids)146 self.assertNotIn(jobs[2], canceled_ids)147 for job_id in jobs[0:2]:148 job = Session.query(Job).get(job_id)149 files = Session.query(File).filter(File.job_id == job_id)150 self.assertEqual("CANCELED", job.job_state)151 self.assertNotEqual(None, job.job_finished)152 self.assertEqual("User banned", job.reason)153 for f in files:154 self.assertEqual("CANCELED", f.file_state)155 self.assertNotEqual(None, f.finish_time)156 self.assertEqual("User banned", f.reason)157 job = Session.query(Job).get(jobs[2])158 self.assertEqual(job.job_state, "FAILED")159 files = Session.query(File).filter(File.job_id == job.job_id)160 for f in files:161 self.assertEqual("FAILED", f.file_state)162 def test_ban_se(self):163 """164 Just ban a SE and unban it, make sure changes go into the DB165 """166 canceled = self.app.post(167 url="/ban/se",168 params={"storage": "gsiftp://nowhere", "message": "TEST BAN 42"},169 status=200,170 ).json171 self.assertEqual(0, len(canceled))172 banned = (173 Session.query(BannedSE).filter(BannedSE.se == "gsiftp://nowhere").first()174 )175 self.assertNotEqual(None, banned)176 self.assertEqual(self.get_user_credentials().user_dn, banned.admin_dn)177 self.assertEqual("CANCEL", banned.status)178 self.assertEqual("TEST BAN 42", banned.message)179 self.app.delete(180 url="/ban/se?storage=%s" % quote("gsiftp://nowhere"), status=204181 )182 banned = (183 Session.query(BannedSE).filter(BannedSE.se == "gsiftp://nowhere").first()184 )185 self.assertEqual(None, banned)186 def test_list_banned_ses(self):187 """188 Ban a SE and make sure it is in the list189 """190 canceled = self.app.post(191 url="/ban/se", params={"storage": "gsiftp://nowhere"}, status=200192 ).json193 self.assertEqual(0, len(canceled))194 banned = self.app.get(url="/ban/se", status=200).json195 self.assertIn("gsiftp://nowhere", [b["se"] for b in banned])196 self.app.delete(197 url="/ban/se?storage=%s" % quote("gsiftp://nowhere"), status=204198 )199 banned = self.app.get(url="/ban/se", status=200).json200 self.assertNotIn("gsiftp://nowhere", [b["se"] for b in banned])201 def test_ban_se_vo(self):202 """203 Just ban a SE and unban it, specifying a VO204 """205 canceled = self.app.post(206 url="/ban/se",207 params={"storage": "gsiftp://nowhere", "vo_name": "testvo"},208 status=200,209 ).json210 self.assertEqual(0, len(canceled))211 banned = Session.query(BannedSE).get(("gsiftp://nowhere", "testvo"))212 self.assertNotEqual(None, banned)213 self.assertEqual(self.get_user_credentials().user_dn, banned.admin_dn)214 self.assertEqual("CANCEL", banned.status)215 self.assertEqual("testvo", banned.vo)216 self.app.delete(217 url="/ban/se?storage=%s&vo_name=testvo" % quote("gsiftp://nowhere"),218 status=204,219 )220 banned = Session.query(BannedSE).get(("gsiftp://nowhere", "someone"))221 self.assertEqual(None, banned)222 def test_ban_se_cancel(self):223 """224 Ban a SE that has files queued, make sure they are canceled225 """226 jobs = list()227 jobs.append(228 insert_job("testvo", "gsiftp://source", "gsiftp://destination", "SUBMITTED")229 )230 jobs.append(231 insert_job("testvo", "gsiftp://source", "gsiftp://destination2", "ACTIVE")232 )233 jobs.append(234 insert_job(235 "testvo",236 "gsiftp://source",237 "gsiftp://destination2",238 "FAILED",239 duration=10,240 queued=20,241 )242 )243 canceled_ids = self.app.post(244 url="/ban/se", params={"storage": "gsiftp://source"}, status=200245 ).json246 self.assertEqual(2, len(canceled_ids))247 self.assertIn(jobs[0], canceled_ids)248 self.assertIn(jobs[1], canceled_ids)249 self.assertNotIn(jobs[2], canceled_ids)250 for job_id in jobs[0:2]:251 job = Session.query(Job).get(job_id)252 files = Session.query(File).filter(File.job_id == job_id)253 self.assertEqual("CANCELED", job.job_state)254 self.assertNotEqual(None, job.job_finished)255 for f in files:256 self.assertEqual("CANCELED", f.file_state)257 self.assertNotEqual(None, f.finish_time)258 self.assertEqual("Storage banned", f.reason)259 job = Session.query(Job).get(jobs[2])260 self.assertEqual(job.job_state, "FAILED")261 files = Session.query(File).filter(File.job_id == job.job_id)262 for f in files:263 self.assertEqual("FAILED", f.file_state)264 def test_ban_se_partial_job(self):265 """266 Ban a SE that has files queued. If a job has other pairs, the job must remain!267 """268 job_id = insert_job(269 "testvo",270 multiple=[271 ("gsiftp://source", "gsiftp://destination"),272 ("gsiftp://other", "gsiftp://destination"),273 ],274 )275 canceled_ids = self.app.post(276 url="/ban/se", params={"storage": "gsiftp://source"}, status=200277 ).json278 self.assertEqual(1, len(canceled_ids))279 self.assertEqual(job_id, canceled_ids[0])280 job = Session.query(Job).get(job_id)281 self.assertEqual("SUBMITTED", job.job_state)282 self.assertEqual(None, job.job_finished)283 files = Session.query(File).filter(File.job_id == job_id)284 for f in files:285 if f.source_se == "gsiftp://source":286 self.assertEqual("CANCELED", f.file_state)287 self.assertNotEqual(None, f.finish_time)288 else:289 self.assertEqual("SUBMITTED", f.file_state)290 def test_ban_se_cancel_vo(self):291 """292 Cancel a SE that has files queued, make sure they are canceled (with VO)293 """294 jobs = list()295 jobs.append(296 insert_job("testvo", "gsiftp://source", "gsiftp://destination", "SUBMITTED")297 )298 jobs.append(299 insert_job("atlas", "gsiftp://source", "gsiftp://destination", "SUBMITTED")300 )301 jobs.append(302 insert_job("atlas", "gsiftp://source", "gsiftp://destination2", "SUBMITTED")303 )304 canceled_ids = self.app.post(305 url="/ban/se",306 params={307 "storage": "gsiftp://source",308 "status": "cancel",309 "vo_name": "testvo",310 },311 status=200,312 ).json313 self.assertEqual(1, len(canceled_ids))314 self.assertIn(jobs[0], canceled_ids)315 for job_id in jobs:316 job = Session.query(Job).get(job_id)317 files = Session.query(File).filter(File.job_id == job_id)318 if job_id in canceled_ids:319 self.assertEqual("CANCELED", job.job_state)320 else:321 self.assertEqual("SUBMITTED", job.job_state)322 for f in files:323 if job_id in canceled_ids:324 self.assertEqual("CANCELED", f.file_state)325 else:326 self.assertEqual("SUBMITTED", f.file_state)327 def test_ban_se_wait(self):328 """329 Ban a SE, but instead of canceling, give jobs some time to finish330 """331 jobs = list()332 jobs.append(333 insert_job("testvo", "gsiftp://source", "gsiftp://destination", "SUBMITTED")334 )335 jobs.append(336 insert_job("testvo", "gsiftp://source", "gsiftp://destination2", "ACTIVE")337 )338 jobs.append(339 insert_job(340 "testvo",341 "gsiftp://source",342 "gsiftp://destination2",343 "FAILED",344 duration=10,345 queued=20,346 )347 )348 waiting_ids = self.app.post(349 url="/ban/se",350 params={"storage": "gsiftp://source", "status": "wait", "timeout": 1234},351 status=200,352 ).json353 self.assertEqual(1, len(waiting_ids))354 self.assertIn(jobs[0], waiting_ids)355 self.assertNotIn(jobs[1], waiting_ids)356 self.assertNotIn(jobs[2], waiting_ids)357 for job_id in jobs[0:2]:358 job = Session.query(Job).get(job_id)359 files = Session.query(File).filter(File.job_id == job_id)360 self.assertIn(job.job_state, ["ACTIVE", "SUBMITTED"])361 self.assertEqual(None, job.job_finished)362 for f in files:363 self.assertIn(f.file_state, ["ACTIVE", "ON_HOLD"])364 self.assertEqual(None, f.finish_time)365 job = Session.query(Job).get(jobs[2])366 self.assertEqual(job.job_state, "FAILED")367 files = Session.query(File).filter(File.job_id == job.job_id)368 for f in files:369 self.assertEqual("FAILED", f.file_state)370 banned = Session.query(BannedSE).get(("gsiftp://source", "testvo"))371 self.assertEqual("WAIT", banned.status)372 def test_ban_se_wait_vo(self):373 """374 Ban a SE, but instead of canceling, give jobs some time to finish (with VO)375 """376 jobs = list()377 jobs.append(378 insert_job("testvo", "gsiftp://source", "gsiftp://destination", "SUBMITTED")379 )380 jobs.append(381 insert_job("atlas", "gsiftp://source", "gsiftp://destination", "SUBMITTED")382 )383 jobs.append(384 insert_job("atlas", "gsiftp://source", "gsiftp://destination2", "SUBMITTED")385 )386 waiting_ids = self.app.post(387 url="/ban/se",388 params={389 "storage": "gsiftp://source",390 "status": "wait",391 "vo_name": "testvo",392 "timeout": 33,393 },394 status=200,395 ).json396 self.assertEqual(1, len(waiting_ids))397 self.assertIn(jobs[0], waiting_ids)398 for job_id in jobs:399 job = Session.query(Job).get(job_id)400 files = Session.query(File).filter(File.job_id == job_id)401 self.assertEqual("SUBMITTED", job.job_state)402 for f in files:403 if job_id in waiting_ids:404 self.assertEqual("ON_HOLD", f.file_state)405 else:406 self.assertEqual("SUBMITTED", f.file_state)407 def test_ban_se_no_submit(self):408 """409 Ban a SE. Submissions to/from se must not be accepted410 """411 self.push_delegation()412 self.app.post(url="/ban/se", params={"storage": "gsiftp://source"}, status=200)413 job = {414 "files": [415 {416 "sources": ["gsiftp://source/path/"],417 "destinations": ["gsiftp://destination/file"],418 }419 ]420 }421 self.app.post(422 url="/jobs",423 content_type="application/json",424 params=json.dumps(job),425 status=403,426 )427 # The other way around428 job = {429 "files": [430 {431 "sources": ["gsiftp://destination/file"],432 "destinations": ["gsiftp://source/path/"],433 }434 ]435 }436 self.app.post(437 url="/jobs",438 content_type="application/json",439 params=json.dumps(job),440 status=403,441 )442 def test_ban_se_with_submission(self):443 """444 Ban a SE but allowing submissions445 """446 self.push_delegation()447 self.app.post(448 url="/ban/se",449 params={450 "storage": "gsiftp://source",451 "status": "wait",452 "allow_submit": True,453 },454 status=200,455 )456 job = {457 "files": [458 {459 "sources": ["gsiftp://source/path/"],460 "destinations": ["gsiftp://destination/file"],461 }462 ]463 }464 job_id = self.app.post(465 url="/jobs",466 content_type="application/json",467 params=json.dumps(job),468 status=200,469 ).json["job_id"]470 files = Session.query(File).filter(File.job_id == job_id)471 for f in files:472 self.assertEqual("ON_HOLD", f.file_state)473 # The other way around474 job = {475 "files": [476 {477 "sources": ["gsiftp://destination/file"],478 "destinations": ["gsiftp://source/path/"],479 }480 ]481 }482 job_id = self.app.post(483 url="/jobs",484 content_type="application/json",485 params=json.dumps(job),486 status=200,487 ).json["job_id"]488 files = Session.query(File).filter(File.job_id == job_id)489 for f in files:490 self.assertEqual("ON_HOLD", f.file_state)491 def test_unban_wait(self):492 """493 Regression for FTS-297494 When unbanning a storage, if any file was left on wait, they must re-enter the queue495 """496 job_id = insert_job(497 "testvo",498 "gsiftp://source",499 "gsiftp://destination",500 "SUBMITTED",501 user_dn="/DC=cern/CN=someone",502 )503 self.app.post(504 url="/ban/se",505 params={506 "storage": "gsiftp://source",507 "status": "wait",508 "allow_submit": True,509 },510 status=200,511 )512 files = Session.query(File).filter(File.job_id == job_id)513 for f in files:514 self.assertEqual("ON_HOLD", f.file_state)515 self.app.delete(url="/ban/se?storage=%s" % quote("gsiftp://source"), status=204)516 files = Session.query(File).filter(File.job_id == job_id)517 for f in files:518 self.assertEqual("SUBMITTED", f.file_state)519 # Some requests that must be rejected520 def test_ban_dn_empty(self):521 """522 Banning with a missing dn must fail523 """524 self.app.post_json(url="/ban/dn", params={}, status=400)525 def test_unban_dn_empty(self):526 """527 Unbanning with a missing dn must fail528 """529 self.app.delete(url="/ban/dn", status=400)530 def test_ban_se_empty(self):531 """532 Ask for banning with a missing storage must fail533 """534 self.app.post_json(url="/ban/se", params={}, status=400)535 def test_unban_se_empty(self):536 """537 Unbanning with a missing se must fail538 """539 self.app.delete(url="/ban/se", status=400)540 def test_ban_se_cancel_and_submit(self):541 """542 Setting status = cancel and ask for allow_submit must fail543 """544 self.app.post(545 url="/ban/se",546 params={547 "storage": "gsiftp://source",548 "status": "cancel",549 "allow_submit": True,550 },551 status=400,552 )553 def test_ban_se_bad_status(self):554 """555 Unbanning with something else than cancel or wait must fail556 """557 self.app.post(558 url="/ban/se",559 params={"storage": "gsiftp://source", "status": "blahblah"},560 status=400,561 )562 def test_ban_se_staging(self):563 """564 Ban a storage with transfers queued as STAGING, submit a new STAGING, unban.565 Final state must be STAGING566 """567 self.push_delegation()568 pre_job_id = insert_job(569 "testvo",570 "srm://source",571 "srm://destination",572 "STAGING",573 user_dn="/DC=cern/CN=someone",574 )575 self.app.post(576 url="/ban/se",577 params={"storage": "srm://source", "status": "wait", "allow_submit": True},578 status=200,579 )580 files = Session.query(File).filter(File.job_id == pre_job_id)581 for f in files:582 self.assertEqual("ON_HOLD_STAGING", f.file_state)...

Full Screen

Full Screen

RandomOpt.py

Source:RandomOpt.py Github

copy

Full Screen

1import copy2from unittest import result3from huristicPaper import huristicPaper4from random import shuffle5import pandas as pd6def RandomOpt(makespan, tardyAmount, resultList, dueList):7 newStruct = {} #length = machine number8 hasTried = {}9 ######## Step 1: 整理一個新的dictionary ########10 for i in resultList.keys():11 hasTried[i] = False12 if(resultList[i][0] not in newStruct.keys()):13 newStruct[resultList[i][0]] = {}14 newStruct[resultList[i][0]]["makespan"] = resultList[i][2]15 newStruct[resultList[i][0]]["jobList"] = [i]16 else:17 if(resultList[i][2] > newStruct[resultList[i][0]]["makespan"]):18 newStruct[resultList[i][0]]["makespan"] = resultList[i][2]19 # 從第一個元素開始看當前i的start time 是否大於他的end time20 totalLen = len(newStruct[resultList[i][0]]["jobList"])21 for j in range(totalLen-1,-1):22 curKey = newStruct[resultList[i][0]]["jobList"][j]23 if(resultList[curKey][2] <= resultList[i][1]):24 newStruct[resultList[i][0]]["jobList"].insert(j+1, i)25 break26 elif(j == 0 and resultList[curKey][1] >= resultList[i][2]):27 newStruct[resultList[i][0]]["jobList"].insert(j, i)28 break29 print("==========================================")30 for i in newStruct:31 print(i, "->", newStruct[i])32 iterateTime = 533 opt_tardy = tardyAmount34 opt_makespan = makespan35 #####################################################################36 # 找最大 makespan 的機器37 max_makespan = -9999938 max_makespan_machine = -139 min_makespan = 9999940 min_makespan_machine = -141 42 for i in range(len(newStruct)):43 if(newStruct[i]["makespan"] > max_makespan):44 max_makespan = newStruct[i]["makespan"]45 max_makespan_machine = i46 if(newStruct[i]["makespan"] < min_makespan):47 min_makespan = newStruct[i]["makespan"]48 min_makespan_machine = i49 ### 在max. makespan的機器上隨機挑一個沒被挑過的 (stage, job)50 51 insert_job_list = copy.deepcopy(newStruct[max_makespan_machine]["jobList"])52 shuffle(insert_job_list)53 54 insert_job = insert_job_list[0]55 for i in range(len(insert_job_list)):56 if(hasTried[insert_job_list[i]] == False):57 # check remove insert job 後,後面的任務會不會變成非法的58 original_index = newStruct[max_makespan_machine]["jobList"].index(insert_job_list)59 original_processing_time = resultList[insert_job_list][2] - resultList[insert_job_list][1]60 valid = True61 for k in range(original_index+1, len(newStruct[max_makespan_machine]["jobList"])):62 # 檢查要移動的job能不能在目標machine上執行63 # if():64 # cur_stage_job = newStruct[max_makespan_machine]["jobList"][k]65 # 如果是同一台machine呢66 if(cur_stage_job.first == 1 and resultList[cur_stage_job][2]-original_processing_time < resultList[(0, cur_stage_job.second)][1]):67 valid = False68 break69 if(valid == False):70 continue71 hasTried[insert_job_list[i]] = True72 insert_job = insert_job_list[i]73 break74 75 print(insert_job) #(stage, job_no)76 insert_job_protime = resultList[insert_job][2] - resultList[insert_job][1]77 # tbd: check remove insert job 後,後面的任務會不會變成非法的78 ### 試試看所有min. makespan 的機器上可插入的位置,看更新後的makespan有沒有小於目前的最佳值79 #### 可插入=要插入位置前一個job(J_e)的end time + stage 1 process time < stage 2 start time80 ##### 看要插入位置後一個job(J_s)的start time - 要插入位置前一個job(J_e)的end time > insert_job_protime81 for i in range(len(newStruct[min_makespan_machine]["jobList"])-1, 0):82 curWork = newStruct[min_makespan_machine]["jobList"][i] #(stage, job_no)83 # 插入的位置對insert job本身來說是否合法84 if(insert_job.first == 0):85 insert_second_stage = insert_job86 insert_second_stage.first += 1 #(stage, job)87 # 如果最早的完成時間會晚於第二階段的開始時間88 if(resultList[curWork][2] + insert_job_protime > resultList[insert_second_stage][1]):89 continue 90 else:91 insert_first_stage = insert_job92 insert_first_stage.first -= 193 # 如果開始位置已經比第一階段的結束時間來的早了94 if(resultList[curWork][2] < resultList[insert_first_stage][2]):95 continue96 # 有沒有足夠的空隙,讓機器後面的元素都不用被更動97 after_curWork = newStruct[min_makespan_machine]["jobList"][i+1]98 if(resultList[after_curWork][1] - resultList[curWork][2] > insert_job_protime):99 # Insert100 newStruct[min_makespan_machine]["jobList"].insert(i+1, insert_job)101 # Remove102 newStruct[max_makespan_machine]["jobList"].remove(insert_job)103 # Update makespan104 lastElement = newStruct[max_makespan_machine][-1]105 newStruct[max_makespan_machine]["makespan"] = resultList[lastElement][2]106 # Update resultList 107 oldEndTime = resultList[insert_job][2]108 resultList[insert_job][0] = min_makespan_machine109 resultList[insert_job][1] = resultList[curWork][2]110 resultList[insert_job][2] = resultList[insert_job][1] + insert_job_protime111 # Update optimal solution112 if(oldEndTime > dueList[insert_job.first] and resultList[insert_job][2] < dueList[insert_job.first]):113 opt_tardy -= 1114 # 重新去找最長的makespan115 temp_max_makespan = -1116 for i in resultList.keys():117 if(resultList[i][2] > temp_max_makespan):118 temp_max_makespan = resultList[i][2]119 if(temp_max_makespan < opt_makespan):120 opt_makespan = temp_max_makespan121 break122 # 若沒有不需更動的空隙,從最後開始插入123 # 檢查欲插入位置後的工作會不會因此不合法124 # 插入並移動後續工作125 126 127 128 ### -> 如果有小於目前最佳值tardy的話,更新result list and new structure129 ### 回傳新的result list, tardy and makespan130 return 0131filepath = "data/instance 2.csv"132makespan, tardyAmount, resultList = huristicPaper(filepath)133for i in resultList:134 print(i, " -> ", resultList[i])135print(resultList)136# Check and import due time137df = pd.read_csv(filepath)138dueList = {}139for index, row in df.iterrows():140 dueList[index] = row['Due Time']...

Full Screen

Full Screen

integration.py

Source:integration.py Github

copy

Full Screen

1# Licensed under the Apache License, Version 2.0 (the "License");2# you may not use this file except in compliance with the License.3# You may obtain a copy of the License at4#5# http://www.apache.org/licenses/LICENSE-2.06#7# Unless required by applicable law or agreed to in writing, software8# distributed under the License is distributed on an "AS IS" BASIS,9# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.10# See the License for the specific language governing permissions and11# limitations under the License.12import logging13import psycopg214from airflow.utils.state import State as DagState15from airflow.version import version as AIRFLOW_VERSION16from marquez_airflow.version import VERSION as MARQUEZ_AIRFLOW_VERSION17from marquez_client import MarquezClient18from marquez_client.models import (19 DatasetType,20 JobType,21 RunState22)23from retrying import retry24log = logging.getLogger(__name__)25NAMESPACE_NAME = 'food_delivery'26# AIRFLOW27DAG_ID = 'orders_popular_day_of_week'28DAG_OWNER = 'anonymous'29DAG_DESCRIPTION = \30 'Determines the popular day of week orders are placed.'31IF_NOT_EXISTS_TASK_ID = 'if_not_exists'32INSERT_TASK_ID = 'insert'33# SOURCE34SOURCE_NAME = 'food_delivery_db'35CONNECTION_URL = \36 'postgres://food_delivery:food_delivery@postgres:5432/food_delivery'37# DATASETS38IN_TABLE_NAME = 'public.top_delivery_times'39IN_TABLE_PHYSICAL_NAME = IN_TABLE_NAME40OUT_TABLE_NAME = 'public.popular_orders_day_of_week'41OUT_TABLE_PHYSICAL_NAME = OUT_TABLE_NAME42OUT_TABLE_FIELDS = [43 {44 'name': 'order_day_of_week',45 'type': 'VARCHAR',46 'tags': [],47 'description': None48 },49 {50 'name': 'order_placed_on',51 'type': 'TIMESTAMP',52 'tags': [],53 'description': None54 },55 {56 'name': 'orders_placed',57 'type': 'INT4',58 'tags': [],59 'description': None60 }61]62client = MarquezClient(url='http://marquez:5000')63airflow_db_conn = psycopg2.connect(64 host="postgres",65 database="airflow",66 user="airflow",67 password="airflow"68)69airflow_db_conn.autocommit = True70@retry(71 wait_exponential_multiplier=1000,72 wait_exponential_max=1000073)74def wait_for_dag():75 log.info(76 f"Waiting for DAG '{DAG_ID}'..."77 )78 cur = airflow_db_conn.cursor()79 cur.execute(80 f"""81 SELECT dag_id, state82 FROM dag_run83 WHERE dag_id = '{DAG_ID}';84 """85 )86 row = cur.fetchone()87 dag_id = row[0]88 dag_state= row[1]89 cur.close()90 log.info(f"DAG '{dag_id}' state set to '{dag_state}'.")91 if dag_state != DagState.SUCCESS:92 raise Exception('Retry!')93def check_namespace_meta():94 namespace = client.get_namespace(NAMESPACE_NAME)95 assert namespace['name'] == NAMESPACE_NAME96 assert namespace['ownerName'] == DAG_OWNER97 assert namespace['description'] is None98def check_source_meta():99 source = client.get_source(SOURCE_NAME)100 assert source['type'] == 'POSTGRESQL'101 assert source['name'] == SOURCE_NAME102 assert source['connectionUrl'] == CONNECTION_URL103 assert source['description'] is None104def check_datasets_meta():105 in_table = client.get_dataset(106 namespace_name=NAMESPACE_NAME,107 dataset_name=IN_TABLE_NAME108 )109 assert in_table['id'] == {110 'namespace': NAMESPACE_NAME,111 'name': IN_TABLE_NAME112 }113 assert in_table['type'] == DatasetType.DB_TABLE.value114 assert in_table['name'] == IN_TABLE_NAME115 assert in_table['physicalName'] == IN_TABLE_PHYSICAL_NAME116 assert in_table['namespace'] == NAMESPACE_NAME117 assert in_table['sourceName'] == SOURCE_NAME118 assert len(in_table['fields']) == 0119 assert len(in_table['tags']) == 0120 assert in_table['lastModifiedAt'] is None121 assert in_table['description'] is None122 out_table = client.get_dataset(123 namespace_name=NAMESPACE_NAME,124 dataset_name=OUT_TABLE_NAME125 )126 assert out_table['id'] == {127 'namespace': NAMESPACE_NAME,128 'name': OUT_TABLE_NAME129 }130 assert out_table['type'] == DatasetType.DB_TABLE.value131 assert out_table['name'] == OUT_TABLE_NAME132 assert out_table['physicalName'] == OUT_TABLE_PHYSICAL_NAME133 assert out_table['namespace'] == NAMESPACE_NAME134 assert out_table['sourceName'] == SOURCE_NAME135 assert out_table['fields'] == OUT_TABLE_FIELDS136 assert len(out_table['tags']) == 0137 assert out_table['lastModifiedAt'] is not None138 assert out_table['description'] is None139def check_jobs_meta():140 if_not_exists_job = client.get_job(141 namespace_name=NAMESPACE_NAME,142 job_name=f"{DAG_ID}.{IF_NOT_EXISTS_TASK_ID}"143 )144 assert if_not_exists_job['id'] == {145 'namespace': NAMESPACE_NAME,146 'name': f"{DAG_ID}.{IF_NOT_EXISTS_TASK_ID}"147 }148 assert if_not_exists_job['type'] == JobType.BATCH.value149 assert if_not_exists_job['namespace'] == NAMESPACE_NAME150 assert len(if_not_exists_job['inputs']) == 0151 assert len(if_not_exists_job['outputs']) == 0152 assert if_not_exists_job['location'] is None153 assert if_not_exists_job['context']['sql'] is not None154 assert if_not_exists_job['context']['airflow.operator'] == \155 'airflow.operators.postgres_operator.PostgresOperator'156 assert if_not_exists_job['context']['airflow.task_info'] is not None157 assert if_not_exists_job['context']['airflow.version'] == AIRFLOW_VERSION158 assert if_not_exists_job['context']['marquez_airflow.version'] == MARQUEZ_AIRFLOW_VERSION159 assert if_not_exists_job['description'] == DAG_DESCRIPTION160 assert if_not_exists_job['latestRun']['state'] == RunState.COMPLETED.value161 insert_job = client.get_job(162 namespace_name=NAMESPACE_NAME,163 job_name=f"{DAG_ID}.{INSERT_TASK_ID}"164 )165 assert insert_job['id'] == {166 'namespace': NAMESPACE_NAME,167 'name': f"{DAG_ID}.{INSERT_TASK_ID}"168 }169 assert insert_job['type'] == JobType.BATCH.value170 assert insert_job['namespace'] == NAMESPACE_NAME171 assert insert_job['inputs'] == [{172 'namespace': NAMESPACE_NAME,173 'name': IN_TABLE_NAME174 }]175 assert insert_job['outputs'] == [{176 'namespace': NAMESPACE_NAME,177 'name': OUT_TABLE_NAME178 }]179 assert insert_job['location'] is None180 assert insert_job['context']['sql'] is not None181 assert insert_job['context']['airflow.operator'] == \182 'airflow.operators.postgres_operator.PostgresOperator'183 assert insert_job['context']['airflow.task_info'] is not None184 assert insert_job['context']['airflow.version'] == AIRFLOW_VERSION185 assert insert_job['context']['marquez_airflow.version'] == MARQUEZ_AIRFLOW_VERSION186 assert insert_job['description'] == DAG_DESCRIPTION187 assert insert_job['latestRun']['state'] == RunState.COMPLETED.value188def check_jobs_run_meta():189 if_not_exists_job = client.get_job(190 namespace_name=NAMESPACE_NAME,191 job_name=f"{DAG_ID}.{IF_NOT_EXISTS_TASK_ID}"192 )193 if_not_exists_job_run = client.get_job_run(194 run_id=if_not_exists_job['latestRun']['id']195 )196 assert if_not_exists_job_run['id'] == if_not_exists_job['latestRun']['id']197 assert if_not_exists_job_run['state'] == RunState.COMPLETED.value198 insert_job = client.get_job(199 namespace_name=NAMESPACE_NAME,200 job_name=f"{DAG_ID}.{INSERT_TASK_ID}"201 )202 insert_job_run = client.get_job_run(203 run_id=insert_job['latestRun']['id']204 )205 assert insert_job_run['id'] == insert_job['latestRun']['id']206 assert insert_job_run['state'] == RunState.COMPLETED.value207def main():208 # (1) Wait for DAG to complete209 wait_for_dag()210 # (2) Run checks on DAG metadata collected211 check_namespace_meta()212 check_source_meta()213 check_datasets_meta()214 check_jobs_meta()215 check_jobs_run_meta()216if __name__ == "__main__":...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful