Best Python code snippet using yandex-tank
schedule_worker.py
Source:schedule_worker.py  
1#!/usr/bin/env python2#3# Copyright (C) 2018 The Android Open Source Project4#5# Licensed under the Apache License, Version 2.0 (the "License");6# you may not use this file except in compliance with the License.7# You may obtain a copy of the License at8#9#      http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16#17import datetime18import itertools19import logging20import re21from google.appengine.ext import ndb22from webapp.src import vtslab_status as Status23from webapp.src.proto import model24from webapp.src.utils import logger25import webapp226MAX_LOG_CHARACTERS = 10000  # maximum number of characters per each log27BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS = 60  # retry minutes when boot-up error is occurred28CREATE_JOB_SUCCESS = "success"29CREATE_JOB_FAILED_NO_BUILD = "no_build"30CREATE_JOB_FAILED_NO_DEVICE = "no_device"31def GetTestVersionType(manifest_branch, gsi_branch, test_type=0):32    """Compares manifest branch and gsi branch to get test type.33    This function only completes two LSBs which represent version related34    test type.35    Args:36        manifest_branch: a string, manifest branch name.37        gsi_branch: a string, gsi branch name.38        test_type: an integer, previous test type value.39    Returns:40        An integer, test type value.41    """42    if not test_type:43        value = 044    else:45        # clear two bits46        value = test_type & ~(1 | 1 << 1)47    if not manifest_branch:48        logging.debug("manifest branch cannot be empty or None.")49        return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN]50    if not gsi_branch:51        logging.debug("gsi_branch is empty.")52        return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_TOT]53    gcs_pattern = "^gs://.*/v([0-9.]*)/.*"54    q_pattern = "(git_)?(aosp-)?q.*"55    p_pattern = "(git_)?(aosp-)?p.*"56    o_mr1_pattern = "(git_)?(aosp-)?o[^-]*-m.*"57    o_pattern = "(git_)?(aosp-)?o.*"58    master_pattern = "(git_)?(aosp-)?master"59    gcs_search = re.search(gcs_pattern, manifest_branch)60    if gcs_search:61        device_version = gcs_search.group(1)62    elif re.match(q_pattern, manifest_branch):63        device_version = "10.0"64    elif re.match(p_pattern, manifest_branch):65        device_version = "9.0"66    elif re.match(o_mr1_pattern, manifest_branch):67        device_version = "8.1"68    elif re.match(o_pattern, manifest_branch):69        device_version = "8.0"70    elif re.match(master_pattern, manifest_branch):71        device_version = "master"72    else:73        logging.debug("Unknown device version.")74        return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN]75    gcs_search = re.search(gcs_pattern, gsi_branch)76    if gcs_search:77        gsi_version = gcs_search.group(1)78    elif re.match(q_pattern, gsi_branch):79        gsi_version = "10.0"80    elif re.match(p_pattern, gsi_branch):81        gsi_version = "9.0"82    elif re.match(o_mr1_pattern, gsi_branch):83        gsi_version = "8.1"84    elif re.match(o_pattern, gsi_branch):85        gsi_version = "8.0"86    elif re.match(master_pattern, gsi_branch):87        gsi_version = "master"88    else:89        logging.debug("Unknown gsi version.")90        return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN]91    if device_version == gsi_version:92        return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_TOT]93    else:94        return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_OTA]95class ScheduleHandler(webapp2.RequestHandler):96    """Background worker class for /worker/schedule_handler.97    This class pull tasks from 'queue-schedule' queue and processes in98    background service 'worker'.99    Attributes:100        logger: Logger class101    """102    logger = logger.Logger()103    def ReserveDevices(self, target_device_serials):104        """Reserves devices.105        Args:106            target_device_serials: a list of strings, containing target device107                                   serial numbers.108        """109        device_query = model.DeviceModel.query(110            model.DeviceModel.serial.IN(target_device_serials))111        devices = device_query.fetch()112        devices_to_put = []113        for device in devices:114            device.scheduling_status = Status.DEVICE_SCHEDULING_STATUS_DICT[115                "reserved"]116            devices_to_put.append(device)117        if devices_to_put:118            ndb.put_multi(devices_to_put)119    def FindBuildId(self, artifact_type, manifest_branch, target,120                    signed=False):121        """Finds a designated build ID.122        Args:123            artifact_type: a string, build artifact type.124            manifest_branch: a string, build manifest branch.125            target: a string which build target and type are joined by '-'.126            signed: a boolean to get a signed build.127        Return:128            string, build ID found.129        """130        build_id = ""131        if "-" in target:132            build_target, build_type = target.split("-")133        else:134            build_target = target135            build_type = ""136        if not artifact_type or not manifest_branch or not build_target:137            self.logger.Println("The argument format is invalid.")138            return build_id139        build_query = model.BuildModel.query(140            model.BuildModel.artifact_type == artifact_type,141            model.BuildModel.manifest_branch == manifest_branch,142            model.BuildModel.build_target == build_target,143            model.BuildModel.build_type == build_type)144        builds = build_query.fetch()145        if builds:146            builds = [147                build for build in builds148                if (build.timestamp >149                    datetime.datetime.now() - datetime.timedelta(hours=72))150            ]151        if builds:152            self.logger.Println("-- Found build ID")153            builds.sort(key=lambda x: x.build_id, reverse=True)154            for build in builds:155                if not signed or build.signed:156                    build_id = build.build_id157                    break158        return build_id159    def post(self):160        self.logger.Clear()161        manual_job = False162        schedule_key = self.request.get("schedule_key")163        if schedule_key:164            key = ndb.key.Key(urlsafe=schedule_key)165            manual_job = True166            schedules = [key.get()]167        else:168            schedule_query = model.ScheduleModel.query(169                model.ScheduleModel.suspended != True)170            schedules = schedule_query.fetch()171        if schedules:172            # filter out the schedules which are not updated within 72 hours.173            schedules = [174                schedule for schedule in schedules175                if (schedule.timestamp >176                    datetime.datetime.now() - datetime.timedelta(hours=72))177            ]178            schedules = self.FilterWithPeriod(schedules)179        if schedules:180            schedules.sort(key=lambda x: self.GetProductName(x))181            group_by_product = [182                list(g)183                for _, g in itertools.groupby(schedules,184                                              lambda x: self.GetProductName(x))185            ]186            for group in group_by_product:187                group.sort(key=lambda x: x.priority_value if (188                    x.priority_value) else Status.GetPriorityValue(x.priority))189                create_result = {190                    CREATE_JOB_SUCCESS: [],191                    CREATE_JOB_FAILED_NO_BUILD: [],192                    CREATE_JOB_FAILED_NO_DEVICE: []193                }194                for schedule in group:195                    self.logger.Println("")196                    self.logger.Println("Schedule: %s (branch: %s)" %197                                        (schedule.test_name,198                                         schedule.manifest_branch))199                    self.logger.Println(200                        "Build Target: %s" % schedule.build_target)201                    self.logger.Println("Device: %s" % schedule.device)202                    self.logger.Indent()203                    result, lab = self.CreateJob(schedule, manual_job)204                    if result == CREATE_JOB_SUCCESS:205                        create_result[result].append(lab)206                    else:207                        create_result[result].append(schedule)208                    self.logger.Unindent()209                # if any schedule in group created a job, increase priority of210                # the schedules which couldn't create due to out of devices.211                schedules_to_put = []212                for lab in create_result[CREATE_JOB_SUCCESS]:213                    for schedule in create_result[CREATE_JOB_FAILED_NO_DEVICE]:214                        if any([lab in target for target in schedule.device215                                ]) and schedule not in schedules_to_put:216                            if schedule.priority_value is None:217                                schedule.priority_value = (218                                    Status.GetPriorityValue(schedule.priority))219                            if schedule.priority_value > 0:220                                schedule.priority_value -= 1221                                schedules_to_put.append(schedule)222                if schedules_to_put:223                    ndb.put_multi(schedules_to_put)224        self.logger.Println("Scheduling completed.")225        lines = self.logger.Get()226        lines = [line.strip() for line in lines]227        outputs = []228        chars = 0229        for line in lines:230            chars += len(line)231            if chars > MAX_LOG_CHARACTERS:232                logging.info("\n".join(outputs))233                outputs = []234                chars = len(line)235            outputs.append(line)236        logging.info("\n".join(outputs))237    def CreateJob(self, schedule, manual_job=False):238        """Creates a job for given schedule.239        Args:240            schedule: model.ScheduleModel instance.241            manual_job: True if a job is created by a user, False otherwise.242        Returns:243            a string of job creation result message.244            a string of lab name if job is created, otherwise empty string.245        """246        target_host, target_device, target_device_serials = (247            self.SelectTargetLab(schedule))248        if not target_host:249            return CREATE_JOB_FAILED_NO_DEVICE, ""250        self.logger.Println("- Target host: %s" % target_host)251        self.logger.Println("- Target device: %s" % target_device)252        self.logger.Println("- Target serials: %s" % target_device_serials)253        # create job and add.254        new_job = model.JobModel()255        new_job.hostname = target_host256        new_job.priority = schedule.priority257        new_job.test_name = schedule.test_name258        new_job.require_signed_device_build = (259            schedule.require_signed_device_build)260        new_job.device = target_device261        new_job.period = schedule.period262        new_job.serial.extend(target_device_serials)263        new_job.build_storage_type = schedule.build_storage_type264        new_job.manifest_branch = schedule.manifest_branch265        new_job.build_target = schedule.build_target266        new_job.pab_account_id = schedule.device_pab_account_id267        new_job.shards = schedule.shards268        new_job.param = schedule.param269        new_job.retry_count = schedule.retry_count270        new_job.gsi_storage_type = schedule.gsi_storage_type271        new_job.gsi_branch = schedule.gsi_branch272        new_job.gsi_build_target = schedule.gsi_build_target273        new_job.gsi_pab_account_id = schedule.gsi_pab_account_id274        new_job.gsi_vendor_version = schedule.gsi_vendor_version275        new_job.test_storage_type = schedule.test_storage_type276        new_job.test_branch = schedule.test_branch277        new_job.test_build_target = schedule.test_build_target278        new_job.test_pab_account_id = schedule.test_pab_account_id279        new_job.parent_schedule = schedule.key280        new_job.image_package_repo_base = schedule.image_package_repo_base281        new_job.required_host_equipment = schedule.required_host_equipment282        new_job.required_device_equipment = schedule.required_device_equipment283        new_job.has_bootloader_img = schedule.has_bootloader_img284        new_job.has_radio_img = schedule.has_radio_img285        new_job.report_bucket = schedule.report_bucket286        new_job.report_spreadsheet_id = schedule.report_spreadsheet_id287        new_job.report_persistent_url = schedule.report_persistent_url288        new_job.report_reference_url = schedule.report_reference_url289        # uses bit 0-1 to indicate version.290        test_type = GetTestVersionType(schedule.manifest_branch,291                                       schedule.gsi_branch)292        # uses bit 2293        if schedule.require_signed_device_build:294            test_type |= Status.TEST_TYPE_DICT[Status.TEST_TYPE_SIGNED]295        if manual_job:296            test_type |= Status.TEST_TYPE_DICT[Status.TEST_TYPE_MANUAL]297        new_job.test_type = test_type298        new_job.build_id = ""299        new_job.gsi_build_id = ""300        new_job.test_build_id = ""301        for artifact_type in ["device", "gsi", "test"]:302            if artifact_type == "device":303                storage_type_text = "build_storage_type"304                manifest_branch_text = "manifest_branch"305                build_target_text = "build_target"306                build_id_text = "build_id"307                signed = new_job.require_signed_device_build308            else:309                storage_type_text = artifact_type + "_storage_type"310                manifest_branch_text = artifact_type + "_branch"311                build_target_text = artifact_type + "_build_target"312                build_id_text = artifact_type + "_build_id"313                signed = False314            manifest_branch = getattr(new_job, manifest_branch_text)315            build_target = getattr(new_job, build_target_text)316            storage_type = getattr(new_job, storage_type_text)317            if storage_type == Status.STORAGE_TYPE_DICT["PAB"]:318                build_id = self.FindBuildId(319                    artifact_type=artifact_type,320                    manifest_branch=manifest_branch,321                    target=build_target,322                    signed=signed)323            elif storage_type == Status.STORAGE_TYPE_DICT["GCS"]:324                # temp value to distinguish from empty values.325                build_id = "gcs"326            else:327                build_id = ""328                self.logger.Println(329                    "Unexpected storage type (%s)." % storage_type)330            setattr(new_job, build_id_text, build_id)331        if ((not new_job.manifest_branch or new_job.build_id)332                and (not new_job.gsi_branch or new_job.gsi_build_id)333                and (not new_job.test_branch or new_job.test_build_id)):334            new_job.build_id = new_job.build_id.replace("gcs", "")335            new_job.gsi_build_id = (new_job.gsi_build_id.replace("gcs", ""))336            new_job.test_build_id = (new_job.test_build_id.replace("gcs", ""))337            self.ReserveDevices(target_device_serials)338            new_job.status = Status.JOB_STATUS_DICT["ready"]339            new_job.timestamp = datetime.datetime.now()340            new_job_key = new_job.put()341            schedule.children_jobs.append(new_job_key)342            schedule.priority_value = Status.GetPriorityValue(343                schedule.priority)344            schedule.put()345            self.logger.Println("A new job has been created.")346            labs = model.LabModel.query(347                model.LabModel.hostname == target_host).fetch()348            return CREATE_JOB_SUCCESS, labs[0].name349        else:350            self.logger.Println("Cannot find builds to create a job.")351            self.logger.Println("- Device branch / build - {} / {}".format(352                new_job.manifest_branch, new_job.build_id))353            self.logger.Println("- GSI branch / build - {} / {}".format(354                new_job.gsi_branch, new_job.gsi_build_id))355            self.logger.Println("- Test branch / build - {} / {}".format(356                new_job.test_branch, new_job.test_build_id))357            return CREATE_JOB_FAILED_NO_BUILD, ""358    def FilterWithPeriod(self, schedules):359        """Filters schedules with period.360        This method filters schedules if any children jobs are created within361        period time.362        Args:363            schedules: a list of model.ScheduleModel instances.364        Returns:365            a list of model.ScheduleModel instances which need to create a new366            job.367        """368        ret_list = []369        if not schedules:370            return ret_list371        if type(schedules) is not list:372            schedules = [schedules]373        for schedule in schedules:374            if not schedule.children_jobs:375                ret_list.append(schedule)376                continue377            latest_job_key = schedule.children_jobs[-1]378            latest_job = latest_job_key.get()379            if datetime.datetime.now() - latest_job.timestamp > (380                    datetime.timedelta(381                        minutes=self.GetCorrectedPeriod(schedule))):382                ret_list.append(schedule)383        return ret_list384    def SelectTargetLab(self, schedule):385        """Find target host and devices to schedule a new job.386        Args:387            schedule: a proto containing the information of a schedule.388        Returns:389            a string which represents hostname,390            a string containing target lab and product with '/' separator,391            a list of selected devices serial (see whether devices will be392            selected later when the job is picked up.)393        """394        available_devices = []395        for target_device in schedule.device:396            if "/" not in target_device:397                self.logger.Println(398                    "Device malformed - {}".format(target_device))399                continue400            target_lab, target_product_type = target_device.split("/")401            self.logger.Println("- Lab %s" % target_lab)402            self.logger.Indent()403            host_query = model.LabModel.query(404                model.LabModel.name == target_lab)405            target_hosts = host_query.fetch()406            if target_hosts:407                for host in target_hosts:408                    if not (set(schedule.required_host_equipment) <= set(409                            host.host_equipment)):410                        continue411                    self.logger.Println("- Host: %s" % host.hostname)412                    self.logger.Indent()413                    device_query = model.DeviceModel.query(414                        model.DeviceModel.hostname == host.hostname,415                        model.DeviceModel.scheduling_status ==416                        Status.DEVICE_SCHEDULING_STATUS_DICT["free"],417                        model.DeviceModel.status.IN([418                            Status.DEVICE_STATUS_DICT["fastboot"],419                            Status.DEVICE_STATUS_DICT["online"],420                            Status.DEVICE_STATUS_DICT["ready"]421                        ]))422                    host_devices = device_query.fetch()423                    host_devices = [424                        x for x in host_devices425                        if x.product.lower() == target_product_type.lower() and426                        (set(schedule.required_device_equipment) <= set(427                            x.device_equipment))428                    ]429                    if len(host_devices) < schedule.shards:430                        self.logger.Println(431                            "A host {} does not have enough devices. "432                            "# of devices = {}, shards = {}".format(433                                host.hostname, len(host_devices),434                                schedule.shards))435                        self.logger.Unindent()436                        continue437                    host_devices.sort(438                        key=lambda x: (len(x.device_equipment)439                                       if x.device_equipment else 0))440                    available_devices.append((host_devices, target_device))441                    self.logger.Unindent()442            self.logger.Unindent()443        if not available_devices:444            self.logger.Println("No hosts have enough devices for schedule!")445            return None, None, []446        available_devices.sort(key=lambda x: (447            sum([len(y.device_equipment) for y in x[0][:schedule.shards]])))448        selected_host_devices = available_devices[0]449        return selected_host_devices[0][0].hostname, selected_host_devices[450            1], [x.serial for x in selected_host_devices[0][:schedule.shards]]451    def GetProductName(self, schedule):452        """Gets a product name from schedule instance.453        Args:454            schedule: a schedule instance.455        Returns:456            a string, product name in lowercase.457        """458        if not schedule or not schedule.device:459            return ""460        if "/" not in schedule.device[0]:461            return ""462        return schedule.device[0].split("/")[1].lower()463    def GetCorrectedPeriod(self, schedule):464        """Corrects and returns period value based on latest children jobs.465        Args:466            schedule: a model.ScheduleModel instance containing schedule467                      information.468        Returns:469            an integer, corrected schedule period.470        """471        if not schedule.error_count or not schedule.children_jobs or (472                schedule.period <= BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS):473            return schedule.period474        latest_job = schedule.children_jobs[-1].get()475        if latest_job.status == Status.JOB_STATUS_DICT["bootup-err"]:476            return BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS477        else:...job_requeuing.py
Source:job_requeuing.py  
1from data_refinery_common.enums import Downloaders, ProcessorPipeline, SurveyJobTypes2from data_refinery_common.logging import get_and_configure_logger3from data_refinery_common.message_queue import send_job4from data_refinery_common.models import (5    DownloaderJob,6    DownloaderJobOriginalFileAssociation,7    ProcessorJob,8    ProcessorJobDatasetAssociation,9    ProcessorJobOriginalFileAssociation,10    SurveyJob,11    SurveyJobKeyValue,12)13logger = get_and_configure_logger(__name__)14def requeue_downloader_job(last_job: DownloaderJob) -> (bool, str):15    """Queues a new downloader job.16    The new downloader job will have num_retries one greater than17    last_job.num_retries.18    Returns True and the volume index of the downloader job upon successful dispatching,19    False and an empty string otherwise.20    """21    num_retries = last_job.num_retries + 122    ram_amount = last_job.ram_amount23    # If there's no start time then it's likely that the instance got24    # cycled which means we didn't get OOM-killed, so we don't need to25    # increase the RAM amount.26    if last_job.start_time and last_job.failure_reason is None:27        if ram_amount == 1024:28            ram_amount = 409629        elif ram_amount == 4096:30            ram_amount = 1638431    original_file = last_job.original_files.first()32    if not original_file:33        last_job.no_retry = True34        last_job.success = False35        last_job.failure_reason = (36            "Foreman told to requeue a DownloaderJob without an OriginalFile - why?!"37        )38        last_job.save()39        logger.info(40            "Foreman told to requeue a DownloaderJob without an OriginalFile - why?!",41            last_job=str(last_job),42        )43        return False44    if not original_file.needs_processing():45        last_job.no_retry = True46        last_job.success = False47        last_job.failure_reason = "Foreman told to redownload job with prior successful processing."48        last_job.save()49        logger.info(50            "Foreman told to redownload job with prior successful processing.",51            last_job=str(last_job),52        )53        return False54    first_sample = original_file.samples.first()55    # This is a magic string that all the dbGaP studies appear to have56    if first_sample and ("in the dbGaP study" in first_sample.title):57        last_job.no_retry = True58        last_job.success = False59        last_job.failure_reason = "Sample is dbGaP access controlled."60        last_job.save()61        logger.info(62            "Avoiding requeuing for DownloaderJob for dbGaP run accession: "63            + str(first_sample.accession_code)64        )65        return False66    new_job = DownloaderJob(67        num_retries=num_retries,68        downloader_task=last_job.downloader_task,69        ram_amount=ram_amount,70        accession_code=last_job.accession_code,71        was_recreated=last_job.was_recreated,72    )73    new_job.save()74    for original_file in last_job.original_files.all():75        DownloaderJobOriginalFileAssociation.objects.get_or_create(76            downloader_job=new_job, original_file=original_file77        )78    logger.debug(79        "Requeuing Downloader Job which had ID %d with a new Downloader Job with ID %d.",80        last_job.id,81        new_job.id,82    )83    try:84        if send_job(Downloaders[last_job.downloader_task], job=new_job, is_dispatch=True):85            last_job.retried = True86            last_job.success = False87            last_job.retried_job = new_job88            last_job.save()89        else:90            # Can't communicate with Batch just now, leave the job for a later loop.91            new_job.delete()92            return False93    except Exception:94        logger.error(95            "Failed to requeue DownloaderJob which had ID %d with a new DownloaderJob with ID %d.",96            last_job.id,97            new_job.id,98        )99        # Can't communicate with Batch just now, leave the job for a later loop.100        new_job.delete()101        return False102    return True103def requeue_processor_job(last_job: ProcessorJob) -> None:104    """Queues a new processor job.105    The new processor job will have num_retries one greater than106    last_job.num_retries.107    """108    num_retries = last_job.num_retries + 1109    # The Salmon pipeline is quite RAM-sensitive.110    # Try it again with an increased RAM amount, if possible.111    new_ram_amount = last_job.ram_amount112    # If there's no start time then it's likely that the instance got113    # cycled which means we didn't get OOM-killed, so we don't need to114    # increase the RAM amount.115    if last_job.start_time:116        # There's only one size of tximport jobs.117        if last_job.pipeline_applied == ProcessorPipeline.TXIMPORT.value:118            new_ram_amount = 32768119        # These initial values are set in common/job_lookup.py:determine_ram_amount120        elif last_job.pipeline_applied in [121            ProcessorPipeline.SALMON.value,122            ProcessorPipeline.TRANSCRIPTOME_INDEX_LONG.value,123            ProcessorPipeline.TRANSCRIPTOME_INDEX_SHORT.value,124            ProcessorPipeline.QN_REFERENCE.value,125        ]:126            if last_job.ram_amount == 4096:127                new_ram_amount = 8192128            if last_job.ram_amount == 8192:129                new_ram_amount = 12288130            elif last_job.ram_amount == 12288:131                new_ram_amount = 16384132            elif last_job.ram_amount == 16384:133                new_ram_amount = 32768134            elif last_job.ram_amount == 32768:135                new_ram_amount = 65536136        # The AFFY pipeline is somewhat RAM-sensitive.137        # Also NO_OP can fail and be retried, so we want to attempt ramping up ram.138        # Try it again with an increased RAM amount, if possible.139        elif (140            last_job.pipeline_applied == ProcessorPipeline.AFFY_TO_PCL.value141            or last_job.pipeline_applied == ProcessorPipeline.NO_OP.value142        ):143            if last_job.ram_amount == 2048:144                new_ram_amount = 4096145            elif last_job.ram_amount == 4096:146                new_ram_amount = 8192147            elif last_job.ram_amount == 8192:148                new_ram_amount = 32768149        elif (150            last_job.pipeline_applied == ProcessorPipeline.ILLUMINA_TO_PCL.value151            and "non-zero exit status -9" in last_job.failure_reason152        ):153            if last_job.ram_amount == 2048:154                new_ram_amount = 4096155            elif last_job.ram_amount == 4096:156                new_ram_amount = 8192157    new_job = ProcessorJob(158        downloader_job=last_job.downloader_job,159        num_retries=num_retries,160        pipeline_applied=last_job.pipeline_applied,161        ram_amount=new_ram_amount,162        batch_job_queue=last_job.batch_job_queue,163    )164    new_job.save()165    for original_file in last_job.original_files.all():166        ProcessorJobOriginalFileAssociation.objects.get_or_create(167            processor_job=new_job, original_file=original_file168        )169    for dataset in last_job.datasets.all():170        ProcessorJobDatasetAssociation.objects.get_or_create(processor_job=new_job, dataset=dataset)171    try:172        logger.debug(173            "Requeuing Processor Job which had ID %d with a new Processor Job with ID %d.",174            last_job.id,175            new_job.id,176        )177        if send_job(ProcessorPipeline[last_job.pipeline_applied], job=new_job, is_dispatch=True):178            last_job.retried = True179            last_job.success = False180            last_job.retried_job = new_job181            last_job.save()182        else:183            # Can't communicate with Batch just now, leave the job for a later loop.184            new_job.delete()185            return False186    except Exception:187        logger.warn(188            "Failed to requeue Processor Job which had ID %d with a new Processor Job with ID %d.",189            last_job.id,190            new_job.id,191            exc_info=1,192        )193        # Can't communicate with Batch just now, leave the job for a later loop.194        new_job.delete()195        return False196    return True197def requeue_survey_job(last_job: SurveyJob) -> None:198    """Queues a new survey job.199    The new survey job will have num_retries one greater than200    last_job.num_retries.201    """202    num_retries = last_job.num_retries + 1203    new_job = SurveyJob(num_retries=num_retries, source_type=last_job.source_type)204    if new_job.num_retries == 1:205        new_job.ram_amount = 4096206    elif new_job.num_retries in [2, 3]:207        new_job.ram_amount = 16384208    else:209        new_job.ram_amount = 1024210    new_job.save()211    keyvalues = SurveyJobKeyValue.objects.filter(survey_job=last_job)212    for keyvalue in keyvalues:213        SurveyJobKeyValue.objects.get_or_create(214            survey_job=new_job, key=keyvalue.key, value=keyvalue.value,215        )216    logger.debug(217        "Requeuing SurveyJob which had ID %d with a new SurveyJob with ID %d.",218        last_job.id,219        new_job.id,220    )221    try:222        if send_job(SurveyJobTypes.SURVEYOR, job=new_job, is_dispatch=True):223            last_job.retried = True224            last_job.success = False225            last_job.retried_job = new_job226            last_job.save()227        else:228            # Can't communicate with Batch just now, leave the job for a later loop.229            new_job.delete()230    except Exception:231        logger.error(232            "Failed to requeue Survey Job which had ID %d with a new Surevey Job with ID %d.",233            last_job.id,234            new_job.id,235        )236        # Can't communicate with AWS just now, leave the job for a later loop.237        new_job.delete()...jobs.py
Source:jobs.py  
1import time2from orm import *3from crawler import Insta4from util import shuffle, rando_hour, log, DAY, WEEK, MONTH5def like(job, session=None):6    count = 07    try:8        insta = Insta()9        insta.login(username=job.i_user.username,10                    password=job.i_user.get_password())11        time.sleep(1)12        # get users tags and shuffles them13        tag_names = [str(tag) for tag in job.i_user.tags]14        tags = shuffle(tag_names)15        for tag in tags:16            insta.search(tag)17            count += insta.like_tag(tag)18            time.sleep(5)19    except Exception as e:20        job.error = '{}: {}'.format(type(e), e)21    job.count = count22    job.finish()23    24    # new run for jobs25    new_job = schedule_next_job(job, rando_hour())26    session.add(new_job)27    session.commit()28    insta.driver.quit()29    return job30def follow(job, session=None):31    new_follows = []32    count = 033    try:34        insta = Insta()35        insta.login(username=job.i_user.username,36                    password=job.i_user.get_password())37        time.sleep(1)38        # get users tags and shuffles them39        tag_names = [str(tag) for tag in job.i_user.tags]40        tags = shuffle(tag_names)41        for tag in tags:42            insta.search(tag)43            users, finished = insta.follow(tag)44            count += len(users)45            new_follows += users46            if finished is True:47                break48            time.sleep(5)49    except Exception as e:50        job.error = '{}: {}'.format(type(e), e)51    if len(new_follows) > 0:52        for user in new_follows:53            f = Following()54            f.timestamp = time.time()55            f.i_user = job.i_user56            f.other_user = user57            session.add(f)58    session.commit()59    job.count = count60    job.finish()61    # new run for jobs62    new_job = schedule_next_job(job, 1.5 * rando_hour())63    session.add(new_job)64    session.commit()65    insta.driver.quit()66    return job67def unfollow(job, session=None):68    deleted = []69    try:70        insta = Insta()71        insta.login(username=job.i_user.username,72                    password=job.i_user.get_password())73        time.sleep(1)74        # get users to unfollow75        following = job.i_user.following[:5]76        filtered_following = [f.other_user for f in following if 77                                f.timestamp < time.time() - WEEK]78        deleted = insta.unfollow(following=filtered_following)79    except Exception as e:80        job.error = '{}: {}'.format(type(e), e)81    82    # remove deleted follows from the database83    if deleted:84        deleted_follows = (session.query(Following)85                                .filter(Following._user == job._user)86                                .filter(Following.other_user.in_(deleted))87                                .all())88        for follow in deleted_follows:89            session.delete(follow)90    session.commit()91    job.count = len(deleted)92    job.finish()93    # new run for job94    new_job = schedule_next_job(job, rando_hour())95    session.add(new_job)96    session.commit()97    insta.driver.quit()98    return job99def count_followers(job, session=None):100    insta = Insta()101    count = insta.count_followers(job.i_user.username)102    insta.driver.quit()103    104    job.count = count105    job.finish()106    new_job = schedule_next_job(job, DAY)107    session.add(new_job)108    session.commit()109    return job110def charge(job, session=None):111    p = Payment()112    try:113        user = job.i_user.user114        sub = job.i_user.subscription115        p.user = user116        p.timestamp = time.time()117        session.add(p)118        discount = [d for d in user.discounts if d.redeemed is False]119        cost = sub.cost120        if discount:121            for d in discount:122                cost -= d.amount123                d.redeemed = True124        session.commit()125        user.charge(cost)126        p.amount = (cost)127        p.paid_through = sub.months * MONTH128        if sub.type == 'continuous':129            next_charge = p.paid_through130        log('Charged {} ${}'.format(user.username, p.amount))131    except Exception as e:132        job.error = '{}: {}'.format(type(e), e)133    134    job.finish()135    session.commit()136    if next_charge > 0:137        new_job = Job(type='charge', run=next_charge)138        new_job.i_user = job.i_user139        session.add(new_job)140        session.commit()141    return job142def schedule_next_job(job, when):143    new_job = Job(type=job.type, run=time.time() + when)144    new_job.i_user = job.i_user...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
