How to use new_job method in yandex-tank

Best Python code snippet using yandex-tank

schedule_worker.py

Source:schedule_worker.py Github

copy

Full Screen

1#!/usr/bin/env python2#3# Copyright (C) 2018 The Android Open Source Project4#5# Licensed under the Apache License, Version 2.0 (the "License");6# you may not use this file except in compliance with the License.7# You may obtain a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16#17import datetime18import itertools19import logging20import re21from google.appengine.ext import ndb22from webapp.src import vtslab_status as Status23from webapp.src.proto import model24from webapp.src.utils import logger25import webapp226MAX_LOG_CHARACTERS = 10000 # maximum number of characters per each log27BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS = 60 # retry minutes when boot-up error is occurred28CREATE_JOB_SUCCESS = "success"29CREATE_JOB_FAILED_NO_BUILD = "no_build"30CREATE_JOB_FAILED_NO_DEVICE = "no_device"31def GetTestVersionType(manifest_branch, gsi_branch, test_type=0):32 """Compares manifest branch and gsi branch to get test type.33 This function only completes two LSBs which represent version related34 test type.35 Args:36 manifest_branch: a string, manifest branch name.37 gsi_branch: a string, gsi branch name.38 test_type: an integer, previous test type value.39 Returns:40 An integer, test type value.41 """42 if not test_type:43 value = 044 else:45 # clear two bits46 value = test_type & ~(1 | 1 << 1)47 if not manifest_branch:48 logging.debug("manifest branch cannot be empty or None.")49 return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN]50 if not gsi_branch:51 logging.debug("gsi_branch is empty.")52 return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_TOT]53 gcs_pattern = "^gs://.*/v([0-9.]*)/.*"54 q_pattern = "(git_)?(aosp-)?q.*"55 p_pattern = "(git_)?(aosp-)?p.*"56 o_mr1_pattern = "(git_)?(aosp-)?o[^-]*-m.*"57 o_pattern = "(git_)?(aosp-)?o.*"58 master_pattern = "(git_)?(aosp-)?master"59 gcs_search = re.search(gcs_pattern, manifest_branch)60 if gcs_search:61 device_version = gcs_search.group(1)62 elif re.match(q_pattern, manifest_branch):63 device_version = "10.0"64 elif re.match(p_pattern, manifest_branch):65 device_version = "9.0"66 elif re.match(o_mr1_pattern, manifest_branch):67 device_version = "8.1"68 elif re.match(o_pattern, manifest_branch):69 device_version = "8.0"70 elif re.match(master_pattern, manifest_branch):71 device_version = "master"72 else:73 logging.debug("Unknown device version.")74 return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN]75 gcs_search = re.search(gcs_pattern, gsi_branch)76 if gcs_search:77 gsi_version = gcs_search.group(1)78 elif re.match(q_pattern, gsi_branch):79 gsi_version = "10.0"80 elif re.match(p_pattern, gsi_branch):81 gsi_version = "9.0"82 elif re.match(o_mr1_pattern, gsi_branch):83 gsi_version = "8.1"84 elif re.match(o_pattern, gsi_branch):85 gsi_version = "8.0"86 elif re.match(master_pattern, gsi_branch):87 gsi_version = "master"88 else:89 logging.debug("Unknown gsi version.")90 return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN]91 if device_version == gsi_version:92 return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_TOT]93 else:94 return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_OTA]95class ScheduleHandler(webapp2.RequestHandler):96 """Background worker class for /worker/schedule_handler.97 This class pull tasks from 'queue-schedule' queue and processes in98 background service 'worker'.99 Attributes:100 logger: Logger class101 """102 logger = logger.Logger()103 def ReserveDevices(self, target_device_serials):104 """Reserves devices.105 Args:106 target_device_serials: a list of strings, containing target device107 serial numbers.108 """109 device_query = model.DeviceModel.query(110 model.DeviceModel.serial.IN(target_device_serials))111 devices = device_query.fetch()112 devices_to_put = []113 for device in devices:114 device.scheduling_status = Status.DEVICE_SCHEDULING_STATUS_DICT[115 "reserved"]116 devices_to_put.append(device)117 if devices_to_put:118 ndb.put_multi(devices_to_put)119 def FindBuildId(self, artifact_type, manifest_branch, target,120 signed=False):121 """Finds a designated build ID.122 Args:123 artifact_type: a string, build artifact type.124 manifest_branch: a string, build manifest branch.125 target: a string which build target and type are joined by '-'.126 signed: a boolean to get a signed build.127 Return:128 string, build ID found.129 """130 build_id = ""131 if "-" in target:132 build_target, build_type = target.split("-")133 else:134 build_target = target135 build_type = ""136 if not artifact_type or not manifest_branch or not build_target:137 self.logger.Println("The argument format is invalid.")138 return build_id139 build_query = model.BuildModel.query(140 model.BuildModel.artifact_type == artifact_type,141 model.BuildModel.manifest_branch == manifest_branch,142 model.BuildModel.build_target == build_target,143 model.BuildModel.build_type == build_type)144 builds = build_query.fetch()145 if builds:146 builds = [147 build for build in builds148 if (build.timestamp >149 datetime.datetime.now() - datetime.timedelta(hours=72))150 ]151 if builds:152 self.logger.Println("-- Found build ID")153 builds.sort(key=lambda x: x.build_id, reverse=True)154 for build in builds:155 if not signed or build.signed:156 build_id = build.build_id157 break158 return build_id159 def post(self):160 self.logger.Clear()161 manual_job = False162 schedule_key = self.request.get("schedule_key")163 if schedule_key:164 key = ndb.key.Key(urlsafe=schedule_key)165 manual_job = True166 schedules = [key.get()]167 else:168 schedule_query = model.ScheduleModel.query(169 model.ScheduleModel.suspended != True)170 schedules = schedule_query.fetch()171 if schedules:172 # filter out the schedules which are not updated within 72 hours.173 schedules = [174 schedule for schedule in schedules175 if (schedule.timestamp >176 datetime.datetime.now() - datetime.timedelta(hours=72))177 ]178 schedules = self.FilterWithPeriod(schedules)179 if schedules:180 schedules.sort(key=lambda x: self.GetProductName(x))181 group_by_product = [182 list(g)183 for _, g in itertools.groupby(schedules,184 lambda x: self.GetProductName(x))185 ]186 for group in group_by_product:187 group.sort(key=lambda x: x.priority_value if (188 x.priority_value) else Status.GetPriorityValue(x.priority))189 create_result = {190 CREATE_JOB_SUCCESS: [],191 CREATE_JOB_FAILED_NO_BUILD: [],192 CREATE_JOB_FAILED_NO_DEVICE: []193 }194 for schedule in group:195 self.logger.Println("")196 self.logger.Println("Schedule: %s (branch: %s)" %197 (schedule.test_name,198 schedule.manifest_branch))199 self.logger.Println(200 "Build Target: %s" % schedule.build_target)201 self.logger.Println("Device: %s" % schedule.device)202 self.logger.Indent()203 result, lab = self.CreateJob(schedule, manual_job)204 if result == CREATE_JOB_SUCCESS:205 create_result[result].append(lab)206 else:207 create_result[result].append(schedule)208 self.logger.Unindent()209 # if any schedule in group created a job, increase priority of210 # the schedules which couldn't create due to out of devices.211 schedules_to_put = []212 for lab in create_result[CREATE_JOB_SUCCESS]:213 for schedule in create_result[CREATE_JOB_FAILED_NO_DEVICE]:214 if any([lab in target for target in schedule.device215 ]) and schedule not in schedules_to_put:216 if schedule.priority_value is None:217 schedule.priority_value = (218 Status.GetPriorityValue(schedule.priority))219 if schedule.priority_value > 0:220 schedule.priority_value -= 1221 schedules_to_put.append(schedule)222 if schedules_to_put:223 ndb.put_multi(schedules_to_put)224 self.logger.Println("Scheduling completed.")225 lines = self.logger.Get()226 lines = [line.strip() for line in lines]227 outputs = []228 chars = 0229 for line in lines:230 chars += len(line)231 if chars > MAX_LOG_CHARACTERS:232 logging.info("\n".join(outputs))233 outputs = []234 chars = len(line)235 outputs.append(line)236 logging.info("\n".join(outputs))237 def CreateJob(self, schedule, manual_job=False):238 """Creates a job for given schedule.239 Args:240 schedule: model.ScheduleModel instance.241 manual_job: True if a job is created by a user, False otherwise.242 Returns:243 a string of job creation result message.244 a string of lab name if job is created, otherwise empty string.245 """246 target_host, target_device, target_device_serials = (247 self.SelectTargetLab(schedule))248 if not target_host:249 return CREATE_JOB_FAILED_NO_DEVICE, ""250 self.logger.Println("- Target host: %s" % target_host)251 self.logger.Println("- Target device: %s" % target_device)252 self.logger.Println("- Target serials: %s" % target_device_serials)253 # create job and add.254 new_job = model.JobModel()255 new_job.hostname = target_host256 new_job.priority = schedule.priority257 new_job.test_name = schedule.test_name258 new_job.require_signed_device_build = (259 schedule.require_signed_device_build)260 new_job.device = target_device261 new_job.period = schedule.period262 new_job.serial.extend(target_device_serials)263 new_job.build_storage_type = schedule.build_storage_type264 new_job.manifest_branch = schedule.manifest_branch265 new_job.build_target = schedule.build_target266 new_job.pab_account_id = schedule.device_pab_account_id267 new_job.shards = schedule.shards268 new_job.param = schedule.param269 new_job.retry_count = schedule.retry_count270 new_job.gsi_storage_type = schedule.gsi_storage_type271 new_job.gsi_branch = schedule.gsi_branch272 new_job.gsi_build_target = schedule.gsi_build_target273 new_job.gsi_pab_account_id = schedule.gsi_pab_account_id274 new_job.gsi_vendor_version = schedule.gsi_vendor_version275 new_job.test_storage_type = schedule.test_storage_type276 new_job.test_branch = schedule.test_branch277 new_job.test_build_target = schedule.test_build_target278 new_job.test_pab_account_id = schedule.test_pab_account_id279 new_job.parent_schedule = schedule.key280 new_job.image_package_repo_base = schedule.image_package_repo_base281 new_job.required_host_equipment = schedule.required_host_equipment282 new_job.required_device_equipment = schedule.required_device_equipment283 new_job.has_bootloader_img = schedule.has_bootloader_img284 new_job.has_radio_img = schedule.has_radio_img285 new_job.report_bucket = schedule.report_bucket286 new_job.report_spreadsheet_id = schedule.report_spreadsheet_id287 new_job.report_persistent_url = schedule.report_persistent_url288 new_job.report_reference_url = schedule.report_reference_url289 # uses bit 0-1 to indicate version.290 test_type = GetTestVersionType(schedule.manifest_branch,291 schedule.gsi_branch)292 # uses bit 2293 if schedule.require_signed_device_build:294 test_type |= Status.TEST_TYPE_DICT[Status.TEST_TYPE_SIGNED]295 if manual_job:296 test_type |= Status.TEST_TYPE_DICT[Status.TEST_TYPE_MANUAL]297 new_job.test_type = test_type298 new_job.build_id = ""299 new_job.gsi_build_id = ""300 new_job.test_build_id = ""301 for artifact_type in ["device", "gsi", "test"]:302 if artifact_type == "device":303 storage_type_text = "build_storage_type"304 manifest_branch_text = "manifest_branch"305 build_target_text = "build_target"306 build_id_text = "build_id"307 signed = new_job.require_signed_device_build308 else:309 storage_type_text = artifact_type + "_storage_type"310 manifest_branch_text = artifact_type + "_branch"311 build_target_text = artifact_type + "_build_target"312 build_id_text = artifact_type + "_build_id"313 signed = False314 manifest_branch = getattr(new_job, manifest_branch_text)315 build_target = getattr(new_job, build_target_text)316 storage_type = getattr(new_job, storage_type_text)317 if storage_type == Status.STORAGE_TYPE_DICT["PAB"]:318 build_id = self.FindBuildId(319 artifact_type=artifact_type,320 manifest_branch=manifest_branch,321 target=build_target,322 signed=signed)323 elif storage_type == Status.STORAGE_TYPE_DICT["GCS"]:324 # temp value to distinguish from empty values.325 build_id = "gcs"326 else:327 build_id = ""328 self.logger.Println(329 "Unexpected storage type (%s)." % storage_type)330 setattr(new_job, build_id_text, build_id)331 if ((not new_job.manifest_branch or new_job.build_id)332 and (not new_job.gsi_branch or new_job.gsi_build_id)333 and (not new_job.test_branch or new_job.test_build_id)):334 new_job.build_id = new_job.build_id.replace("gcs", "")335 new_job.gsi_build_id = (new_job.gsi_build_id.replace("gcs", ""))336 new_job.test_build_id = (new_job.test_build_id.replace("gcs", ""))337 self.ReserveDevices(target_device_serials)338 new_job.status = Status.JOB_STATUS_DICT["ready"]339 new_job.timestamp = datetime.datetime.now()340 new_job_key = new_job.put()341 schedule.children_jobs.append(new_job_key)342 schedule.priority_value = Status.GetPriorityValue(343 schedule.priority)344 schedule.put()345 self.logger.Println("A new job has been created.")346 labs = model.LabModel.query(347 model.LabModel.hostname == target_host).fetch()348 return CREATE_JOB_SUCCESS, labs[0].name349 else:350 self.logger.Println("Cannot find builds to create a job.")351 self.logger.Println("- Device branch / build - {} / {}".format(352 new_job.manifest_branch, new_job.build_id))353 self.logger.Println("- GSI branch / build - {} / {}".format(354 new_job.gsi_branch, new_job.gsi_build_id))355 self.logger.Println("- Test branch / build - {} / {}".format(356 new_job.test_branch, new_job.test_build_id))357 return CREATE_JOB_FAILED_NO_BUILD, ""358 def FilterWithPeriod(self, schedules):359 """Filters schedules with period.360 This method filters schedules if any children jobs are created within361 period time.362 Args:363 schedules: a list of model.ScheduleModel instances.364 Returns:365 a list of model.ScheduleModel instances which need to create a new366 job.367 """368 ret_list = []369 if not schedules:370 return ret_list371 if type(schedules) is not list:372 schedules = [schedules]373 for schedule in schedules:374 if not schedule.children_jobs:375 ret_list.append(schedule)376 continue377 latest_job_key = schedule.children_jobs[-1]378 latest_job = latest_job_key.get()379 if datetime.datetime.now() - latest_job.timestamp > (380 datetime.timedelta(381 minutes=self.GetCorrectedPeriod(schedule))):382 ret_list.append(schedule)383 return ret_list384 def SelectTargetLab(self, schedule):385 """Find target host and devices to schedule a new job.386 Args:387 schedule: a proto containing the information of a schedule.388 Returns:389 a string which represents hostname,390 a string containing target lab and product with '/' separator,391 a list of selected devices serial (see whether devices will be392 selected later when the job is picked up.)393 """394 available_devices = []395 for target_device in schedule.device:396 if "/" not in target_device:397 self.logger.Println(398 "Device malformed - {}".format(target_device))399 continue400 target_lab, target_product_type = target_device.split("/")401 self.logger.Println("- Lab %s" % target_lab)402 self.logger.Indent()403 host_query = model.LabModel.query(404 model.LabModel.name == target_lab)405 target_hosts = host_query.fetch()406 if target_hosts:407 for host in target_hosts:408 if not (set(schedule.required_host_equipment) <= set(409 host.host_equipment)):410 continue411 self.logger.Println("- Host: %s" % host.hostname)412 self.logger.Indent()413 device_query = model.DeviceModel.query(414 model.DeviceModel.hostname == host.hostname,415 model.DeviceModel.scheduling_status ==416 Status.DEVICE_SCHEDULING_STATUS_DICT["free"],417 model.DeviceModel.status.IN([418 Status.DEVICE_STATUS_DICT["fastboot"],419 Status.DEVICE_STATUS_DICT["online"],420 Status.DEVICE_STATUS_DICT["ready"]421 ]))422 host_devices = device_query.fetch()423 host_devices = [424 x for x in host_devices425 if x.product.lower() == target_product_type.lower() and426 (set(schedule.required_device_equipment) <= set(427 x.device_equipment))428 ]429 if len(host_devices) < schedule.shards:430 self.logger.Println(431 "A host {} does not have enough devices. "432 "# of devices = {}, shards = {}".format(433 host.hostname, len(host_devices),434 schedule.shards))435 self.logger.Unindent()436 continue437 host_devices.sort(438 key=lambda x: (len(x.device_equipment)439 if x.device_equipment else 0))440 available_devices.append((host_devices, target_device))441 self.logger.Unindent()442 self.logger.Unindent()443 if not available_devices:444 self.logger.Println("No hosts have enough devices for schedule!")445 return None, None, []446 available_devices.sort(key=lambda x: (447 sum([len(y.device_equipment) for y in x[0][:schedule.shards]])))448 selected_host_devices = available_devices[0]449 return selected_host_devices[0][0].hostname, selected_host_devices[450 1], [x.serial for x in selected_host_devices[0][:schedule.shards]]451 def GetProductName(self, schedule):452 """Gets a product name from schedule instance.453 Args:454 schedule: a schedule instance.455 Returns:456 a string, product name in lowercase.457 """458 if not schedule or not schedule.device:459 return ""460 if "/" not in schedule.device[0]:461 return ""462 return schedule.device[0].split("/")[1].lower()463 def GetCorrectedPeriod(self, schedule):464 """Corrects and returns period value based on latest children jobs.465 Args:466 schedule: a model.ScheduleModel instance containing schedule467 information.468 Returns:469 an integer, corrected schedule period.470 """471 if not schedule.error_count or not schedule.children_jobs or (472 schedule.period <= BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS):473 return schedule.period474 latest_job = schedule.children_jobs[-1].get()475 if latest_job.status == Status.JOB_STATUS_DICT["bootup-err"]:476 return BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS477 else:...

Full Screen

Full Screen

job_requeuing.py

Source:job_requeuing.py Github

copy

Full Screen

1from data_refinery_common.enums import Downloaders, ProcessorPipeline, SurveyJobTypes2from data_refinery_common.logging import get_and_configure_logger3from data_refinery_common.message_queue import send_job4from data_refinery_common.models import (5 DownloaderJob,6 DownloaderJobOriginalFileAssociation,7 ProcessorJob,8 ProcessorJobDatasetAssociation,9 ProcessorJobOriginalFileAssociation,10 SurveyJob,11 SurveyJobKeyValue,12)13logger = get_and_configure_logger(__name__)14def requeue_downloader_job(last_job: DownloaderJob) -> (bool, str):15 """Queues a new downloader job.16 The new downloader job will have num_retries one greater than17 last_job.num_retries.18 Returns True and the volume index of the downloader job upon successful dispatching,19 False and an empty string otherwise.20 """21 num_retries = last_job.num_retries + 122 ram_amount = last_job.ram_amount23 # If there's no start time then it's likely that the instance got24 # cycled which means we didn't get OOM-killed, so we don't need to25 # increase the RAM amount.26 if last_job.start_time and last_job.failure_reason is None:27 if ram_amount == 1024:28 ram_amount = 409629 elif ram_amount == 4096:30 ram_amount = 1638431 original_file = last_job.original_files.first()32 if not original_file:33 last_job.no_retry = True34 last_job.success = False35 last_job.failure_reason = (36 "Foreman told to requeue a DownloaderJob without an OriginalFile - why?!"37 )38 last_job.save()39 logger.info(40 "Foreman told to requeue a DownloaderJob without an OriginalFile - why?!",41 last_job=str(last_job),42 )43 return False44 if not original_file.needs_processing():45 last_job.no_retry = True46 last_job.success = False47 last_job.failure_reason = "Foreman told to redownload job with prior successful processing."48 last_job.save()49 logger.info(50 "Foreman told to redownload job with prior successful processing.",51 last_job=str(last_job),52 )53 return False54 first_sample = original_file.samples.first()55 # This is a magic string that all the dbGaP studies appear to have56 if first_sample and ("in the dbGaP study" in first_sample.title):57 last_job.no_retry = True58 last_job.success = False59 last_job.failure_reason = "Sample is dbGaP access controlled."60 last_job.save()61 logger.info(62 "Avoiding requeuing for DownloaderJob for dbGaP run accession: "63 + str(first_sample.accession_code)64 )65 return False66 new_job = DownloaderJob(67 num_retries=num_retries,68 downloader_task=last_job.downloader_task,69 ram_amount=ram_amount,70 accession_code=last_job.accession_code,71 was_recreated=last_job.was_recreated,72 )73 new_job.save()74 for original_file in last_job.original_files.all():75 DownloaderJobOriginalFileAssociation.objects.get_or_create(76 downloader_job=new_job, original_file=original_file77 )78 logger.debug(79 "Requeuing Downloader Job which had ID %d with a new Downloader Job with ID %d.",80 last_job.id,81 new_job.id,82 )83 try:84 if send_job(Downloaders[last_job.downloader_task], job=new_job, is_dispatch=True):85 last_job.retried = True86 last_job.success = False87 last_job.retried_job = new_job88 last_job.save()89 else:90 # Can't communicate with Batch just now, leave the job for a later loop.91 new_job.delete()92 return False93 except Exception:94 logger.error(95 "Failed to requeue DownloaderJob which had ID %d with a new DownloaderJob with ID %d.",96 last_job.id,97 new_job.id,98 )99 # Can't communicate with Batch just now, leave the job for a later loop.100 new_job.delete()101 return False102 return True103def requeue_processor_job(last_job: ProcessorJob) -> None:104 """Queues a new processor job.105 The new processor job will have num_retries one greater than106 last_job.num_retries.107 """108 num_retries = last_job.num_retries + 1109 # The Salmon pipeline is quite RAM-sensitive.110 # Try it again with an increased RAM amount, if possible.111 new_ram_amount = last_job.ram_amount112 # If there's no start time then it's likely that the instance got113 # cycled which means we didn't get OOM-killed, so we don't need to114 # increase the RAM amount.115 if last_job.start_time:116 # There's only one size of tximport jobs.117 if last_job.pipeline_applied == ProcessorPipeline.TXIMPORT.value:118 new_ram_amount = 32768119 # These initial values are set in common/job_lookup.py:determine_ram_amount120 elif last_job.pipeline_applied in [121 ProcessorPipeline.SALMON.value,122 ProcessorPipeline.TRANSCRIPTOME_INDEX_LONG.value,123 ProcessorPipeline.TRANSCRIPTOME_INDEX_SHORT.value,124 ProcessorPipeline.QN_REFERENCE.value,125 ]:126 if last_job.ram_amount == 4096:127 new_ram_amount = 8192128 if last_job.ram_amount == 8192:129 new_ram_amount = 12288130 elif last_job.ram_amount == 12288:131 new_ram_amount = 16384132 elif last_job.ram_amount == 16384:133 new_ram_amount = 32768134 elif last_job.ram_amount == 32768:135 new_ram_amount = 65536136 # The AFFY pipeline is somewhat RAM-sensitive.137 # Also NO_OP can fail and be retried, so we want to attempt ramping up ram.138 # Try it again with an increased RAM amount, if possible.139 elif (140 last_job.pipeline_applied == ProcessorPipeline.AFFY_TO_PCL.value141 or last_job.pipeline_applied == ProcessorPipeline.NO_OP.value142 ):143 if last_job.ram_amount == 2048:144 new_ram_amount = 4096145 elif last_job.ram_amount == 4096:146 new_ram_amount = 8192147 elif last_job.ram_amount == 8192:148 new_ram_amount = 32768149 elif (150 last_job.pipeline_applied == ProcessorPipeline.ILLUMINA_TO_PCL.value151 and "non-zero exit status -9" in last_job.failure_reason152 ):153 if last_job.ram_amount == 2048:154 new_ram_amount = 4096155 elif last_job.ram_amount == 4096:156 new_ram_amount = 8192157 new_job = ProcessorJob(158 downloader_job=last_job.downloader_job,159 num_retries=num_retries,160 pipeline_applied=last_job.pipeline_applied,161 ram_amount=new_ram_amount,162 batch_job_queue=last_job.batch_job_queue,163 )164 new_job.save()165 for original_file in last_job.original_files.all():166 ProcessorJobOriginalFileAssociation.objects.get_or_create(167 processor_job=new_job, original_file=original_file168 )169 for dataset in last_job.datasets.all():170 ProcessorJobDatasetAssociation.objects.get_or_create(processor_job=new_job, dataset=dataset)171 try:172 logger.debug(173 "Requeuing Processor Job which had ID %d with a new Processor Job with ID %d.",174 last_job.id,175 new_job.id,176 )177 if send_job(ProcessorPipeline[last_job.pipeline_applied], job=new_job, is_dispatch=True):178 last_job.retried = True179 last_job.success = False180 last_job.retried_job = new_job181 last_job.save()182 else:183 # Can't communicate with Batch just now, leave the job for a later loop.184 new_job.delete()185 return False186 except Exception:187 logger.warn(188 "Failed to requeue Processor Job which had ID %d with a new Processor Job with ID %d.",189 last_job.id,190 new_job.id,191 exc_info=1,192 )193 # Can't communicate with Batch just now, leave the job for a later loop.194 new_job.delete()195 return False196 return True197def requeue_survey_job(last_job: SurveyJob) -> None:198 """Queues a new survey job.199 The new survey job will have num_retries one greater than200 last_job.num_retries.201 """202 num_retries = last_job.num_retries + 1203 new_job = SurveyJob(num_retries=num_retries, source_type=last_job.source_type)204 if new_job.num_retries == 1:205 new_job.ram_amount = 4096206 elif new_job.num_retries in [2, 3]:207 new_job.ram_amount = 16384208 else:209 new_job.ram_amount = 1024210 new_job.save()211 keyvalues = SurveyJobKeyValue.objects.filter(survey_job=last_job)212 for keyvalue in keyvalues:213 SurveyJobKeyValue.objects.get_or_create(214 survey_job=new_job, key=keyvalue.key, value=keyvalue.value,215 )216 logger.debug(217 "Requeuing SurveyJob which had ID %d with a new SurveyJob with ID %d.",218 last_job.id,219 new_job.id,220 )221 try:222 if send_job(SurveyJobTypes.SURVEYOR, job=new_job, is_dispatch=True):223 last_job.retried = True224 last_job.success = False225 last_job.retried_job = new_job226 last_job.save()227 else:228 # Can't communicate with Batch just now, leave the job for a later loop.229 new_job.delete()230 except Exception:231 logger.error(232 "Failed to requeue Survey Job which had ID %d with a new Surevey Job with ID %d.",233 last_job.id,234 new_job.id,235 )236 # Can't communicate with AWS just now, leave the job for a later loop.237 new_job.delete()...

Full Screen

Full Screen

jobs.py

Source:jobs.py Github

copy

Full Screen

1import time2from orm import *3from crawler import Insta4from util import shuffle, rando_hour, log, DAY, WEEK, MONTH5def like(job, session=None):6 count = 07 try:8 insta = Insta()9 insta.login(username=job.i_user.username,10 password=job.i_user.get_password())11 time.sleep(1)12 # get users tags and shuffles them13 tag_names = [str(tag) for tag in job.i_user.tags]14 tags = shuffle(tag_names)15 for tag in tags:16 insta.search(tag)17 count += insta.like_tag(tag)18 time.sleep(5)19 except Exception as e:20 job.error = '{}: {}'.format(type(e), e)21 job.count = count22 job.finish()23 24 # new run for jobs25 new_job = schedule_next_job(job, rando_hour())26 session.add(new_job)27 session.commit()28 insta.driver.quit()29 return job30def follow(job, session=None):31 new_follows = []32 count = 033 try:34 insta = Insta()35 insta.login(username=job.i_user.username,36 password=job.i_user.get_password())37 time.sleep(1)38 # get users tags and shuffles them39 tag_names = [str(tag) for tag in job.i_user.tags]40 tags = shuffle(tag_names)41 for tag in tags:42 insta.search(tag)43 users, finished = insta.follow(tag)44 count += len(users)45 new_follows += users46 if finished is True:47 break48 time.sleep(5)49 except Exception as e:50 job.error = '{}: {}'.format(type(e), e)51 if len(new_follows) > 0:52 for user in new_follows:53 f = Following()54 f.timestamp = time.time()55 f.i_user = job.i_user56 f.other_user = user57 session.add(f)58 session.commit()59 job.count = count60 job.finish()61 # new run for jobs62 new_job = schedule_next_job(job, 1.5 * rando_hour())63 session.add(new_job)64 session.commit()65 insta.driver.quit()66 return job67def unfollow(job, session=None):68 deleted = []69 try:70 insta = Insta()71 insta.login(username=job.i_user.username,72 password=job.i_user.get_password())73 time.sleep(1)74 # get users to unfollow75 following = job.i_user.following[:5]76 filtered_following = [f.other_user for f in following if 77 f.timestamp < time.time() - WEEK]78 deleted = insta.unfollow(following=filtered_following)79 except Exception as e:80 job.error = '{}: {}'.format(type(e), e)81 82 # remove deleted follows from the database83 if deleted:84 deleted_follows = (session.query(Following)85 .filter(Following._user == job._user)86 .filter(Following.other_user.in_(deleted))87 .all())88 for follow in deleted_follows:89 session.delete(follow)90 session.commit()91 job.count = len(deleted)92 job.finish()93 # new run for job94 new_job = schedule_next_job(job, rando_hour())95 session.add(new_job)96 session.commit()97 insta.driver.quit()98 return job99def count_followers(job, session=None):100 insta = Insta()101 count = insta.count_followers(job.i_user.username)102 insta.driver.quit()103 104 job.count = count105 job.finish()106 new_job = schedule_next_job(job, DAY)107 session.add(new_job)108 session.commit()109 return job110def charge(job, session=None):111 p = Payment()112 try:113 user = job.i_user.user114 sub = job.i_user.subscription115 p.user = user116 p.timestamp = time.time()117 session.add(p)118 discount = [d for d in user.discounts if d.redeemed is False]119 cost = sub.cost120 if discount:121 for d in discount:122 cost -= d.amount123 d.redeemed = True124 session.commit()125 user.charge(cost)126 p.amount = (cost)127 p.paid_through = sub.months * MONTH128 if sub.type == 'continuous':129 next_charge = p.paid_through130 log('Charged {} ${}'.format(user.username, p.amount))131 except Exception as e:132 job.error = '{}: {}'.format(type(e), e)133 134 job.finish()135 session.commit()136 if next_charge > 0:137 new_job = Job(type='charge', run=next_charge)138 new_job.i_user = job.i_user139 session.add(new_job)140 session.commit()141 return job142def schedule_next_job(job, when):143 new_job = Job(type=job.type, run=time.time() + when)144 new_job.i_user = job.i_user...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run yandex-tank automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful