Best Python code snippet using grail_python
job.py
Source:job.py  
1import logging2import msgpack3import os.path4import time5import uuid6from config import config7from db.mongo import MongoObject8from error import JobBrokenError9import helpers as h10import keys11from sync import sync_manager12from sync.error import (13    LockError,14    LockFailedError,15    InconsistentLockError,16    LockAlreadyAcquiredError,17    API_ERROR_CODE18)19from tasks import TaskFactory20import pymongo21logger = logging.getLogger('mm.jobs')22RESTORE_CFG = config.get('restore', {})23class Job(MongoObject):24    STATUS_NOT_APPROVED = 'not_approved'25    STATUS_NEW = 'new'26    STATUS_EXECUTING = 'executing'27    STATUS_PENDING = 'pending'28    STATUS_BROKEN = 'broken'29    STATUS_COMPLETED = 'completed'30    STATUS_CANCELLED = 'cancelled'31    GROUP_LOCK_PREFIX = 'group/'32    COUPLE_LOCK_PREFIX = 'couple/'33    ACTIVE_STATUSES = (34        STATUS_NOT_APPROVED,35        STATUS_NEW,36        STATUS_EXECUTING,37        STATUS_PENDING,38        STATUS_BROKEN,39    )40    RESOURCE_FS = 'fs'41    RESOURCE_HOST_IN = 'host_in'42    RESOURCE_HOST_OUT = 'host_out'43    COMMON_PARAMS = ('need_approving',)44    GROUP_FILE_MARKER_PATH = RESTORE_CFG.get('group_file_marker')45    GROUP_FILE_DIR_MOVE_SRC_RENAME = RESTORE_CFG.get('group_file_dir_move_src_rename')46    BACKEND_DOWN_MARKER = RESTORE_CFG.get('backend_down_marker')47    IDS_FILE_PATH = RESTORE_CFG.get('ids_file')48    DNET_CLIENT_ALREADY_IN_PROGRESS = -11449    def __init__(self, need_approving=False):50        self.id = uuid.uuid4().hex51        self.status = (self.STATUS_NOT_APPROVED52                       if need_approving else53                       self.STATUS_NEW)54        self._create_ts = None55        self._start_ts = None56        self._update_ts = None57        self._finish_ts = None58        self.type = None59        self.tasks = []60        self.error_msg = []61    @classmethod62    def new(cls, session, **kwargs):63        super(Job, cls).new(session, **kwargs)64        cparams = {}65        for cparam in cls.COMMON_PARAMS:66            if cparam in kwargs:67                cparams[cparam] = kwargs[cparam]68        job = cls(**cparams)69        for param in cls.PARAMS:70            setattr(job, param, kwargs.get(param))71        ts = time.time()72        job.create_ts = ts73        job.update_ts = ts74        job._dirty = True75        try:76            job._set_resources()77        except Exception as e:78            logger.exception('Job {}: failed to set job resources'.format(job.id))79            raise80        try:81            job.perform_locks()82        except LockFailedError as e:83            logger.error('Job {0}: failed to perform required locks: {1}'.format(job.id, e))84            raise85        except LockError:86            logger.error('Job {0}: failed to perform required locks'.format(job.id))87            raise88        try:89            job.mark_groups(session)90        except Exception as e:91            logger.error('Job {0}: failed to mark required groups: {1}'.format(job.id, e))92            job.release_locks()93            raise94        return job95    def _set_resources(self):96        raise NotImplemented('_set_resources method for {} should be '97            'implemented'.format(type(self).__name__))98    @classmethod99    def from_data(cls, data):100        job = cls()101        job.load(data)102        return job103    def load(self, data):104        self.id = data['id'].encode('utf-8')105        self.status = data['status']106        self.create_ts = data.get('create_ts') or data['start_ts']107        self.start_ts = data['start_ts']108        self.finish_ts = data['finish_ts']109        self.update_ts = data.get('update_ts') or self.finish_ts or self.start_ts110        self.type = data['type']111        self.error_msg = data.get('error_msg', [])112        self.tasks = [TaskFactory.make_task(task_data, self) for task_data in data['tasks']]113        for param in self.PARAMS:114            val = data.get(param)115            if isinstance(val, unicode):116                val = val.encode('utf-8')117            setattr(self, param, val)118        return self119    def make_path(self, path, base_path=None):120        if not path:121            return ''122        if os.path.isabs(path):123            return path124        if base_path:125            path = os.path.join(base_path, path)126        return path127    @property128    def create_ts(self):129        return self._create_ts130    @create_ts.setter131    def create_ts(self, value):132        if value is None:133            self._create_ts = None134        else:135            self._create_ts = int(value)136    @property137    def start_ts(self):138        return self._start_ts139    @start_ts.setter140    def start_ts(self, value):141        if value is None:142            self._start_ts = None143        else:144            self._start_ts = int(value)145    @property146    def update_ts(self):147        return self._update_ts148    @update_ts.setter149    def update_ts(self, value):150        if value is None:151            self._update_ts = None152        else:153            self._update_ts = int(value)154    @property155    def finish_ts(self):156        return self._finish_ts157    @finish_ts.setter158    def finish_ts(self, value):159        if value is None:160            self._finish_ts = None161        else:162            self._finish_ts = int(value)163    def _dump(self):164        data = {'id': self.id,165                'status': self.status,166                'create_ts': self.create_ts,167                'start_ts': self.start_ts,168                'update_ts': self.update_ts,169                'finish_ts': self.finish_ts,170                'type': self.type,171                'error_msg': self.error_msg}172        data.update({173            k: getattr(self, k)174            for k in self.PARAMS175        })176        return data177    def dump(self):178        data = self._dump()179        data['tasks'] = [task.dump() for task in self.tasks]180        return data181    def human_dump(self):182        data = self._dump()183        data['tasks'] = [task.human_dump() for task in self.tasks]184        return data185    def node_backend(self, host, port, backend_id):186        return '{0}:{1}/{2}'.format(host, port, backend_id)187    def create_tasks(self):188        raise RuntimeError('Job creation should be implemented '189            'in derived class')190    def perform_locks(self):191        try:192            sync_manager.persistent_locks_acquire(self._locks, self.id)193        except LockAlreadyAcquiredError as e:194            if e.holder_id != self.id:195                logger.error('Job {0}: {1} is already '196                    'being processed by job {2}'.format(self.id, e.lock_id, e.holder_id))197                last_error = self.error_msg and self.error_msg[-1] or None198                if last_error and (last_error.get('code') != API_ERROR_CODE.LOCK_ALREADY_ACQUIRED or199                                   last_error.get('holder_id') != e.holder_id):200                    self.add_error(e)201                raise202            else:203                logger.warn('Job {0}: lock for group {1} has already '204                    'been acquired, skipping'.format(self.id, self.group))205    def mark_groups(self, session):206        for group_id, updated_meta in self._group_marks():207            s = session.clone()208            s.set_groups([group_id])209            packed = msgpack.packb(updated_meta)210            _, failed_group = h.write_retry(211                s, keys.SYMMETRIC_GROUPS_KEY, packed)212            if failed_group:213                logger.error('Job {0}: failed to mark group {1}'.format(214                    self.id, group_id))215    def _group_marks(self):216        """217        Optional method for subclasses to mark groups.218        Returns iterable of (group, updated_meta)219        """220        return []221    def unmark_groups(self, session):222        for group_id, updated_meta in self._group_unmarks():223            s = session.clone()224            s.set_groups([group_id])225            packed = msgpack.packb(updated_meta)226            _, failed_group = h.write_retry(227                s, keys.SYMMETRIC_GROUPS_KEY, packed)228            if failed_group:229                logger.error('Job {0}: failed to unmark group {1}'.format(230                    self.id, group_id))231    def _group_unmarks(self):232        """233        Optional method for subclasses to mark groups.234        Returns iterable of (group, updated_meta)235        """236        return []237    def release_locks(self):238        try:239            sync_manager.persistent_locks_release(self._locks, self.id)240        except InconsistentLockError as e:241            logger.error('Job {0}: some of the locks {1} are already acquired by another '242                'job {2}'.format(self.id, self._locks, e.holder_id))243            pass244    @property245    def _locks(self):246        return (['{0}{1}'.format(self.GROUP_LOCK_PREFIX, group)247                 for group in self._involved_groups] +248                ['{0}{1}'.format(self.COUPLE_LOCK_PREFIX, couple)249                 for couple in self._involved_couples])250    @property251    def involved_uncoupled_groups(self):252        '''Returns uncoupled groups' ids that are involved in the job253        '''254        return []255    def check_node_backends(self, group, node_backends_count=1):256        if len(group.node_backends) != node_backends_count:257            raise JobBrokenError('Group {0} cannot be used for job, '258                                 'it has {1} node backends, 1 expected'.format(259                                     group.group_id, len(group.node_backends)))260    def complete(self, processor):261        if self.status == self.STATUS_COMPLETED:262            self.on_complete(processor)263        try:264            self.unmark_groups(processor.session)265        except Exception as e:266            logger.error('Job {0}: failed to unmark required groups: {1}'.format(267                self.id, e))268            raise269        ts = time.time()270        if not self.start_ts:271            self.start_ts = ts272        self.update_ts = ts273        self.finish_ts = ts274        self.release_locks()275    def add_error(self, e):276        error_msg = e.dump()277        error_msg['ts'] = time.time()278        self.error_msg.append(error_msg)279    def add_error_msg(self, msg):280        self.error_msg.append({'ts': time.time(), 'msg': msg})281    def on_start(self):282        """283        Performs checkings before starting the job.284        Should through JobBrokenError if job should not be started.285        """286        pass287    def on_complete(self, processor):288        """289        Performs action after job completion.290        """291        pass292    @staticmethod293    def list(collection, **kwargs):294        """295        TODO: This should be moved to some kind of Mongo Session object296        """297        params = {}298        sort_by = kwargs.pop('sort_by', 'create_ts')299        sort_by_order = kwargs.pop('sort_by_order', pymongo.DESCENDING)300        for k, v in kwargs.iteritems():301            if v is None:302                continue303            params.update(condition(k, v))304        return collection.find(params).sort(sort_by, sort_by_order)305def condition(field_name, field_val):306    if isinstance(field_val, (list, tuple)):307        return {field_name: {'$in': list(field_val)}}308    else:...send_commands.py
Source:send_commands.py  
1from nornir import InitNornir2from nornir.plugins.tasks.networking import netmiko_send_config, netmiko_send_command3from nornir.plugins.functions.text import print_title, print_result4from nornir.plugins.tasks import networking, text5from nornir.core.exceptions import NornirExecutionError6from colorama import Fore7from nornir.core.filter import F8red = Fore.RED9green = Fore.GREEN10reset = Fore.RESET11def send_config(task):12    """ a function for running something"""13    r = task.run(task=text.template_file,14                     name="AAA Tacacs Base configuration",15                     template="base.j2",16                     tacacs_server = task.host.data['tacacs_server'],17                     tacacs_key = task.host.data['tacacs_key'],18                     radius_server = task.host.data['radius_key'],19                     radius_key = task.host.data['radius_key'],20                     mgmt_vlan = task.host.data['mgmt_vlan'],21                     source_int = task.host.data['source_int'],22                     path=f"templates/{task.host.data['hardware_type']}",)23    task.host["rendered_config"] = r.result24    cmd = task.run(netmiko_send_config,25                name="Deploying config in the host",26                config_commands=task.host["rendered_config"])27    # task.host["show"] = cmd.result28    # shw_cmd = task.host["show"]29    print_result(cmd)30def main():31    nr = InitNornir(config_file="config.yml")32    # result = nr.run(task=send_config)33    try:34        prompt = input("select a group of device: ")35        if prompt == "all":36            result = nr.run(task=send_config)37            prompt_all = result.failed_hosts.keys()  38            hosts = nr.inventory.hosts39            print('x' * 80)40            print("\t\t\t\tTASK SUMMARY")41            print('x' * 80)42            for x in hosts:43                if x in prompt_all:44                    print(red + (f"\t\t\tError\t\t-->\t{x}"))45                else:46                    print(green + (f"\t\t\tSuccess\t\t-->\t{x}") + reset)47        elif prompt == prompt:48            groups = nr.filter(F(groups__contains=prompt,))49            groups_result = groups.run(task=send_config)50            failed_group = groups_result.failed_hosts.keys()51            hosts = groups.inventory.hosts.keys()52            print_result(groups)53            print('x' * 80)54            print("\t\t\t\tTASK SUMMARY")55            print('x' * 80)56            for x in hosts:57                if x in failed_group:58                    print(red + (f"\t\t\tError\t\t-->\t{x}"))59                else:60                    print(green + (f"\t\t\tSuccess\t\t-->\t{x}") + reset)61    except IndexError:62        print("not allowed")63if __name__ == "__main__":...test_direct_exception_handling.py
Source:test_direct_exception_handling.py  
...17    passed_step()18    failed_step()19    raise Exception('we should not reach this')20@step(step_group=True)21def failed_group():22    passed_step()23    failed_step()24    raise Exception('we should not reach this too')25def method_fail_group():26    failed_group()27    raise Exception('we should not reach this even here')28def method_error():29    passed_step()30    error_step()31    raise Exception('we should not reach this')32@step(step_group=True)33def error_group():34    passed_step()35    error_step()36    raise Exception('we should not reach this too')37def method_error_group():38    error_group()39    raise Exception('we should not reach this even here')40class TestDirectHandling(TestCase):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
