Best Python code snippet using lemoncheesecake
tasks.py
Source:tasks.py  
...125            terminate_status = event_job_status.job_status(job_id, stop_job=True)126            if terminate_status['status']:127                update_workflow_status(spec, status=False, error='', workflow=saga.job.CANCELED)128                msg = f"Workflow Canceled for handover {spec['handover_token']}"129                log_and_publish(make_report('INFO', msg, spec))130                # stop all running beekeeper jobs131                event_job_status.stop_hive_jobs(spec['hive_db_uri'])132                return terminate_status133        return event_status134    except Exception as e:135        raise Exception(str(e))136def restart_workflow(restart_type: str, spec: dict) -> bool:137    """[Restart Stopped/Failed Workflows]138    Args:139        restart_type (str): [Option to restart beekeeper/init/entire workflow ]140        spec (dict): [Workflow payload]141    Raises:142        Exception: [When failed to restart the workflow]143    Returns:144        [bool]: [Restart status]145    """    146    try:147        update_workflow_status(spec, status=True, error='', workflow=states.STARTED)148        if restart_type == 'BEEKEEPER':149            current_job = spec['current_job']150            # set param init_pipeline to False to run beekeeper alone151            log_and_publish(make_report('INFO', f"RESTART {current_job['PipelineName']} Pipeline", spec))152            workflow_run_pipeline.delay(current_job, spec, init_pipeline=False)153        elif restart_type == 'INIT_PIPELINE':154            current_job = spec['current_job']155            log_and_publish(make_report('INFO', f"RESTART {current_job['PipelineName']} Pipeline", spec))156            workflow_run_pipeline.delay(current_job, spec)157        elif restart_type == 'SKIP_CURRENT_PIPELINE':158            current_job = spec['current_job']159            # set current job status to skipped for reference160            current_job['pipeline_status'] = 'SKIPPED'161            spec['completed_jobs'].append(current_job)162            log_and_publish(make_report('INFO', f"SKIPPED {current_job['PipelineName']} Pipeline", spec))163            monitor_process_pipeline.delay(spec)164        elif restart_type == 'WORKFLOW':165            current_job = [spec['current_job']]166            spec['flow'] = spec['completed_jobs'] + current_job + spec['flow']167            # reset the currentjob and completed job to null168            spec['current_job'] = {}169            spec['completed_jobs'] = []170            log_and_publish(make_report('INFO', 'Restart Entire Workflow', spec))171            workflow_run_pipeline.delay(current_job, spec)172        return True173    except Exception as e:174        raise Exception(str(e))175@app.task(bind=True, queue="event_job_status", default_retry_delay=120, max_retries=None)176def event_job_status(self, spec:dict, job_id, host:str) -> bool:177    """[Get Pipeline Status and update in ES]178    Args:179        spec ([dict]): [Workflow payload]180        job_id ([type]): [radical saga job id]181        host ([type]): [Host name where the job is running]182    Raises:183        self.retry: [When Job is running]184    Returns:185        [bool]: [Job status]186    """    187    try:188        host_con_details = pycfg.__dict__[host]189        event_job_status = RemoteCmd(190            REMOTE_HOST=host_con_details.get('REMOTE_HOST', None),191            ADDRESS=host_con_details.get('ADDRESS', None),192            USER=host_con_details.get('USER', None),193            PASSWORD=host_con_details.get('PASSWORD', None),194            WORKING_DIR=host_con_details.get('WORKING_DIR', None),195        )196        event_status = event_job_status.job_status(job_id)197        if event_status['status']:198            if event_status['job_status'] in [saga.job.DONE]:199                # check if beekeeper is completed successfully200                beekeeper_status = event_job_status.beekeper_status(spec['hive_db_uri'])201                if beekeeper_status['status'] and beekeeper_status['value'] == 'NO_WORK':202                    msg = f"Pipeline {spec['current_job']['PipelineName']} {saga.job.DONE} "203                    spec['status'] = True204                    spec['completed_jobs'].append(spec['current_job'])205                    spec['current_job'] = {}206                    log_and_publish(make_report('INFO', msg, spec))207                    # start another pipeline208                    monitor_process_pipeline.delay(spec)209                else:210                    msg = f"Pipeline {spec['current_job']['PipelineName']} beekeeper failed {beekeeper_status['error']}"211                    update_workflow_status(spec, status=False, error=beekeeper_status['value'],212                                           workflow=saga.job.FAILED)213                    log_and_publish(make_report('ERROR', msg, spec))214            if event_status['job_status'] in [saga.job.SUSPENDED, saga.job.CANCELED, saga.job.FAILED, saga.job.UNKNOWN]:215                msg = f"Pipeline {spec['current_job']['PipelineName']} {event_status['job_status']}"216                update_workflow_status(spec, status=False, error=event_status['error'],217                                       workflow=event_status['job_status'])218                log_and_publish(make_report('ERROR', msg, spec))219                return False220        else:221            msg = f"Failed to fetch Pipeline {spec['current_job']['PipelineName']}  status "222            update_workflow_status(spec, status=False, error=event_status['error'], workflow=saga.job.FAILED)223            log_and_publish(make_report('ERROR', msg, spec))224            return False225    except Exception as e:226        update_workflow_status(spec, status=False, error=str(e), workflow=saga.job.FAILED)227        log_and_publish(make_report('ERROR', str(e), spec))228        return False229    if event_status['status']:230        if event_status['job_status'] in [saga.job.PENDING, saga.job.RUNNING]:231            msg = f"Pipeline {spec['current_job']['PipelineName']} {saga.job.RUNNING}"232            spec_debug = {key: value for (key, value) in spec.items() if key not in ['flow', 'completed_jobs']}233            log_and_publish(make_report('DEBUG', msg, spec_debug))234            raise self.retry()235    return msg236@app.task(bind=True, queue="workflow", task_track_started=True,237          result_persistent=True)238def workflow_run_pipeline(self, run_job: dict, global_spec: dict, init_pipeline: Optional[bool]=True):239    """[Celery worker to initiate the pipeline and its beekeeper]240    Args:241        run_job (dict): [Current pipeline job to start]242        global_spec (dict): [Workflow information]243        init_pipeline (Optional[bool], optional): [to initiate the hive init_pipeline]. Defaults to True.244    Raises:245        ValueError: [When failed to initialize pipeline]246    Returns:247        [bool]: [pipeline initiation status]248    """    249    try:250        temp = construct_pipeline(run_job, global_spec)251        # execute remote command over ssh252        # get fram login details based on host253        host = temp['HOST'] if temp['HOST'] else pycfg.DEFAULT_HOST254        host_con_details = pycfg.__dict__[host]255        exece = RemoteCmd(256            REMOTE_HOST=host_con_details.get('REMOTE_HOST', None),257            ADDRESS=host_con_details.get('ADDRESS', None),258            USER=host_con_details.get('USER', None),259            PASSWORD=host_con_details.get('PASSWORD', None),260            WORKING_DIR=host_con_details.get('WORKING_DIR', None),261            mysql_url=temp['mysql_url']262        )263        global_spec['task_id'] = self.request.id264        global_spec['hive_db_uri'] = temp['mysql_url']265        msg = f"Pipeline {run_job['PipelineName']} Intiated"266        log_and_publish(make_report('DEBUG', msg, global_spec))267        job = {'status': True}268        if init_pipeline:269            job = exece.run_job(command=' '.join(temp['init']['command']), args=temp['init']['args'],270                                stdout=temp['init']['stdout'], stderr=temp['init']['stderr'], synchronus=True)271        if job['status']:272            job = exece.run_job(command=' '.join(temp['beekeeper']['command']),273                                args=temp['beekeeper']['args'], stdout=temp['beekeeper']['stdout'],274                                stderr=temp['beekeeper']['stderr'])275            if job['status']:276                global_spec['current_job']['job_id'] = job['job_id']277                global_spec['current_job']['HOST'] = host278                msg = f"Pipeline {run_job['PipelineName']} {job['state']}"279                log_and_publish(make_report('INFO', msg, global_spec))280                event_job_status.delay(global_spec, job['job_id'], host)281            else:282                raise ValueError(f"Pipeline {run_job['PipelineName']} failed : {job['error']}")283        else:284            raise ValueError(f"Pipeline {run_job['PipelineName']} failed: {job['error']}")285        return True286    except Exception as e:287        update_workflow_status(global_spec, status=False, error=str(e), workflow=saga.job.FAILED)288        log_and_publish(make_report('ERROR', str(e), global_spec))289        return f"{run_job['PipelineName']} : Exception error:  {str(e)}"290@app.task(bind=True, queue="monitor")291def monitor_process_pipeline(self, spec: dict):292    """[Checks Workflows Status and pending pipelines to run]293    Args:294        spec (dict): [Workflow payload details]295    Returns:296        [bool]: [Workflow status]297    """    298    try:299        if spec.get('status', False):300            if len(spec.get('flow', [])) > 0:301                job = spec['flow'].pop(0)302                spec['current_job'] = job303                spec['status'] = True304                spec['workflow'] = states.STARTED305                msg = f"Pipeline {job['PipelineName']} Started!"306                log_and_publish(make_report('INFO', msg, spec))307                # run pipeline job308                workflow_run_pipeline.delay(job, spec)309            elif len(spec.get('flow', [])) == 0:310                spec['status'] = True311                spec['current_job'] = {}312                spec['workflow'] = saga.job.DONE313                msg = f"Workflow completed for handover {spec['handover_token']}"314                log_and_publish(make_report('INFO', msg, spec))315        else:316            spec['status'] = False317            spec['workflow'] = saga.job.FAILED318            msg = f"Workflow failed to complete for handover {spec['handover_token']}"319            log_and_publish(make_report('ERROR', msg, spec))320    except Exception as e:321        update_workflow_status(spec, status=False, error=str(e), workflow=saga.job.FAILED)322        msg = f"Workflow failed to complete for handover {spec['handover_token']}: {str(e)}"323        log_and_publish(make_report('ERROR', msg, spec))324        return f"Error:  {str(e)}"325    return True326def initiate_pipeline(spec: dict, event: Optional[dict]={}, rerun: Optional[bool]=False) -> dict:327    """[Prepare Workflow payload from handover payload and initialize it]328    Args:329        spec (dict): [Handover Payload specification]   330        event (Optional[dict], optional): [event received]. Defaults to {}.331        rerun (Optional[bool], optional): [rerun type]. Defaults to False.332    Raises:333        Exception: [When failed to prepare workflow payload]334    Returns:335        dict: [Workflow initialize status with payload information]336    """    337    try:338        # prepare the payload with production pipelines based on dbtype and division339        spec.update(prepare_payload(spec))340        # set username to run the pipeline341        if not spec.get('user', None):342            spec['user'] = pycfg.FARM_USER343            # set hive url to run the pipelines344        if not spec.get('hive_url', None):345            spec['hive_url'] = pycfg.HIVE_URL346        if 'flow' not in spec or len(spec['flow']) == 0:347            raise Exception('Unable to construct workflow to run production pipeline.')348        # remove .....349        # spec['flow'] = [spec['flow'][0]]350        msg = f"Workflow Started for handover token {spec['handover_token']}"351        log_and_publish(make_report('INFO', msg, spec))352        # submit workflow to monitor queue353        monitor_process_pipeline.delay(spec)354        return {'status': True, 'error': '', 'spec': spec}355    except Exception as e:356        update_workflow_status(spec, status=False, error=str(e), workflow=saga.job.FAILED)357        msg = f"Workflow failed for handover token {spec['handover_token']}"358        log_and_publish(make_report('INFO', msg, spec))...test_models.py
Source:test_models.py  
...23        self.bench = Benchmark.objects.get(name='TestBench')24    def test_average_change_bad(self):25        self.make_result(12)26        s2 = self.make_result(15)27        rep = self.make_report(s2)28        self.assertEqual(rep.colorcode, 'red')29    def test_average_change_good(self):30        self.make_result(15)31        s2 = self.make_result(12)32        rep = self.make_report(s2)33        self.assertEqual(rep.colorcode, 'green')34    def test_within_threshold_none(self):35        self.make_result(15)36        s2 = self.make_result(15.2)37        rep = self.make_report(s2)38        self.assertEqual(rep.colorcode, 'none')39    def test_initial_revision_none(self):40        s2 = self.make_result(15)41        rep = self.make_report(s2)42        self.assertEqual(rep.colorcode, 'none')43    def test_bench_change_good(self):44        b1 = self.make_bench('b1')45        s1 = self.make_result(15)46        self.make_result(15, rev=s1, benchmark=b1)47        s2 = self.make_result(14.54)48        self.make_result(15, rev=s2, benchmark=b1)49        rep = self.make_report(s2)50        self.assertEqual(rep.colorcode, 'green')51        self.assertIn(self.bench.name, rep.summary)52    def test_bench_change_bad(self):53        b1 = self.make_bench('b1')54        s1 = self.make_result(15)55        self.make_result(15, rev=s1, benchmark=b1)56        s2 = self.make_result(15.46)57        self.make_result(15, rev=s2, benchmark=b1)58        rep = self.make_report(s2)59        self.assertEqual(rep.colorcode, 'red')60        self.assertIn(self.bench.name, rep.summary)61    # NOTE: Don't need to test with multiple projects since the calculation of62    # urgency doesn't take projects into account63    def test_average_change_beats_bench_change(self):64        b1 = self.make_bench('b1')65        s1 = self.make_result(15)66        self.make_result(15, rev=s1, benchmark=b1)67        s2 = self.make_result(14)68        self.make_result(15, rev=s2, benchmark=b1)69        rep = self.make_report(s2)70        self.assertIn('Average', rep.summary)71    def test_good_benchmark_change_beats_bad_average_trend(self):72        changes = self.make_bad_trend()73        b1 = self.make_bench('b1')74        for x in changes:75            s1 = self.make_result(x)76            if x != changes[-1]:77                self.make_result(x, rev=s1, benchmark=b1)78        self.make_result(changes[-2] * .97, rev=s1, benchmark=b1)79        rep = self.make_report(s1)80        self.assertEquals('green', rep.colorcode)81        self.assertIn('b1', rep.summary)82    def test_good_average_change_beats_bad_average_trend(self):83        changes = self.make_bad_trend()84        b1 = self.make_bench('b1')85        for x in changes:86            s1 = self.make_result(x)87            if x != changes[-1]:88                self.make_result(x, rev=s1, benchmark=b1)89        self.make_result(changes[-2] * .92, rev=s1, benchmark=b1)90        rep = self.make_report(s1)91        self.assertEquals('green', rep.colorcode)92        self.assertIn('Average', rep.summary)93    def test_good_change_beats_good_trend(self):94        changes = self.make_good_trend()95        b1 = self.make_bench('b1')96        for x in changes:97            s1 = self.make_result(x)98            if x != changes[-1]:99                self.make_result(x, rev=s1, benchmark=b1)100        self.make_result(changes[-2] * .95, rev=s1, benchmark=b1)101        rep = self.make_report(s1)102        self.assertIn('b1', rep.summary)103        self.assertNotIn('trend', rep.summary)104    def test_bad_trend_beats_good_trend(self):105        good_changes = self.make_good_trend()106        bad_changes = self.make_bad_trend()107        b1 = self.make_bench('b1')108        for i in range(len(good_changes)):109            s1 = self.make_result(good_changes[i])110            self.make_result(bad_changes[i], rev=s1, benchmark=b1)111        rep = self.make_report(s1)112        self.assertIn('trend', rep.summary)113        self.assertIn('b1', rep.summary)114        self.assertIn('yellow', rep.colorcode)115    def test_bad_change_beats_good_trend(self):116        changes = self.make_good_trend()117        b1 = self.make_bench('b1')118        for x in changes:119            s1 = self.make_result(x)120            if x != changes[-1]:121                self.make_result(x, rev=s1, benchmark=b1)122        self.make_result(changes[-2] * 1.05, rev=s1, benchmark=b1)123        rep = self.make_report(s1)124        self.assertIn('b1', rep.summary)125        self.assertNotIn('trend', rep.summary)126        self.assertEquals('red', rep.colorcode)127    def test_bad_beats_good_change(self):128        b1 = self.make_bench('b1')129        s1 = self.make_result(12)130        self.make_result(12, rev=s1, benchmark=b1)131        s2 = self.make_result(15)132        self.make_result(9, rev=s2, benchmark=b1)133        rep = self.make_report(s2)134        self.assertEqual(rep.colorcode, 'red')135    def test_bigger_bad_beats_smaller_bad(self):136        b1 = self.make_bench('b1')137        b2 = self.make_bench('b2')138        s1 = self.make_result(1.0)139        self.make_result(1.0, rev=s1, benchmark=b1)140        self.make_result(1.0, rev=s1, benchmark=b2)141        s2 = self.make_result(1.0)142        self.make_result(1.04, rev=s2, benchmark=b1)143        self.make_result(1.03, rev=s2, benchmark=b2)144        rep = self.make_report(s2)145        self.assertIn('b1', rep.summary)146        self.assertEquals('red', rep.colorcode)147    def test_multiple_quantities(self):148        b1 = self.make_bench('b1', quantity='Space', units='bytes')149        s1 = self.make_result(1.0)150        self.make_result(1.0, rev=s1, benchmark=b1)151        s2 = self.make_result(1.4)152        self.make_result(1.5, rev=s2, benchmark=b1)153        rep = self.make_report(s2)154        self.assertRegexpMatches(rep.summary, '[sS]pace')155        self.assertEquals('red', rep.colorcode)156    def make_result(self, value, rev=None, benchmark=None):157        from uuid import uuid4158        if not benchmark:159            benchmark = self.bench160        if not rev:161            commitdate = self.starttime + timedelta(days=self.days)162            cid = str(uuid4())163            Revision(commitid=cid, date=commitdate, branch=self.b,164                     project=self.pro).save()165            rev = Revision.objects.get(commitid=cid)166        Result(value=value, revision=rev, executable=self.exe,167               environment=self.env, benchmark=benchmark).save()168        self.days += 1169        return rev170    def make_report(self, revision):171        Report(revision=revision, environment=self.env,172               executable=self.exe).save()173        return Report.objects.get(revision=revision)174    def make_bench(self, name, quantity='Time', units='seconds'):175        Benchmark(name=name, units_title=quantity, units=units).save()176        return Benchmark.objects.get(name=name)177    def make_bad_trend(self):178        return self.make_trend(1)179    def make_good_trend(self):180        return self.make_trend(-1)181    def make_trend(self, direction):182        return [1 + direction * x * 1.25 *183                settings.TREND_THRESHOLD / 100 / settings.TREND184                for x in range(settings.TREND)]...test_record_attribute.py
Source:test_record_attribute.py  
1import logging2import unittest3from logging import Formatter4from urllib.parse import quote5from flask import Flask6from logging_utilities.filters import ConstAttribute7from logging_utilities.filters.flask_attribute import FlaskRequestAttribute8app = Flask(__name__)9class RecordAttributesTest(unittest.TestCase):10    def setUp(self):11        super().setUp()12        self.maxDiff = None13    @classmethod14    def _configure_const_attribute(cls, logger):15        logger.setLevel(logging.DEBUG)16        for handler in logger.handlers:17            const_attribute = ConstAttribute(application="test_application")18            handler.addFilter(const_attribute)19            handler.setFormatter(Formatter("%(levelname)s:%(application)s:%(message)s"))20    @classmethod21    def _configure_flask_attribute(cls, logger):22        logger.setLevel(logging.DEBUG)23        for handler in logger.handlers:24            const_attribute = FlaskRequestAttribute(25                attributes=['url', 'method', 'headers', 'json', 'query_string']26            )27            handler.addFilter(const_attribute)28            handler.setFormatter(29                Formatter(30                    "%(levelname)s:%(message)s:%(flask_request_url)s:%(flask_request_json)s:%(flask_request_query_string)s"31                )32            )33    def test_const_attribute(self):34        with self.assertLogs('test_formatter', level=logging.DEBUG) as ctx:35            logger = logging.getLogger('test_formatter')36            self._configure_const_attribute(logger)37            logger.info('Simple message')38            logger.info('Composed message: %s', 'this is a composed message')39            logger.info('Composed message %s', 'with extra', extra={'extra1': 23})40        self.assertEqual(41            ctx.output,42            [43                'INFO:test_application:Simple message',44                'INFO:test_application:Composed message: this is a composed message',45                'INFO:test_application:Composed message with extra'46            ]47        )48    def test_empty_flask_attribute(self):49        with self.assertLogs('test_formatter', level=logging.DEBUG) as ctx:50            logger = logging.getLogger('test_formatter')51            self._configure_flask_attribute(logger)52            logger.info('Simple message')53            logger.info('Composed message: %s', 'this is a composed message')54            logger.info('Composed message %s', 'with extra', extra={'extra1': 23})55        self.assertEqual(56            ctx.output,57            [58                'INFO:Simple message:::',59                'INFO:Composed message: this is a composed message:::',60                'INFO:Composed message with extra:::'61            ]62        )63    def test_flask_attribute_json(self):64        with self.assertLogs('test_formatter', level=logging.DEBUG) as ctx:65            logger = logging.getLogger('test_formatter')66            self._configure_flask_attribute(logger)67            with app.test_request_context('/make_report/2017', data={'format': 'short'}):68                logger.info('Simple message')69                logger.info('Composed message: %s', 'this is a composed message')70                logger.info('Composed message %s', 'with extra', extra={'extra1': 23})71            with app.test_request_context('/make_report/2017', data=''):72                logger.info('Simple message')73            with app.test_request_context(74                '/make_report/2017', data='non json data', content_type='application/json'75            ):76                logger.info('Simple message')77            with app.test_request_context(78                '/make_report/2017', data='{}', content_type='application/json'79            ):80                logger.info('Simple message')81            with app.test_request_context(82                '/make_report/2017',83                data='{"jsonData": "this is a json data"}',84                content_type='application/json'85            ):86                logger.info('Simple message')87        self.assertEqual(88            ctx.output,89            [90                # pylint: disable=line-too-long91                'INFO:Simple message:http://localhost/make_report/2017:None:',92                'INFO:Composed message: this is a composed message:http://localhost/make_report/2017:None:',93                'INFO:Composed message with extra:http://localhost/make_report/2017:None:',94                'INFO:Simple message:http://localhost/make_report/2017:None:',95                "INFO:Simple message:http://localhost/make_report/2017:b'non json data':",96                'INFO:Simple message:http://localhost/make_report/2017:{}:',97                "INFO:Simple message:http://localhost/make_report/2017:{'jsonData': 'this is a json data'}:",98            ]99        )100    def test_flask_attribute_query_string(self):101        with self.assertLogs('test_formatter', level=logging.DEBUG) as ctx:102            logger = logging.getLogger('test_formatter')103            self._configure_flask_attribute(logger)104            with app.test_request_context('/make_report/2017?param1=value1'):105                logger.info('Simple message')106                logger.info('Composed message: %s', 'this is a composed message')107                logger.info('Composed message %s', 'with extra', extra={'extra1': 23})108            with app.test_request_context('/make_report/2017?param1=value1¶m2=value2'):109                logger.info('Simple message')110            with app.test_request_context(f'/make_report/2017?param1={quote("This a string ?")}'):111                logger.info('Simple message')112        self.assertEqual(113            ctx.output,114            [115                # pylint: disable=line-too-long116                'INFO:Simple message:http://localhost/make_report/2017?param1=value1:None:param1=value1',117                'INFO:Composed message: this is a composed message:http://localhost/make_report/2017?param1=value1:None:param1=value1',118                'INFO:Composed message with extra:http://localhost/make_report/2017?param1=value1:None:param1=value1',119                'INFO:Simple message:http://localhost/make_report/2017?param1=value1¶m2=value2:None:param1=value1¶m2=value2',120                'INFO:Simple message:http://localhost/make_report/2017?param1=This%20a%20string%20%3F:None:param1=This%20a%20string%20%3F',121            ]...run_application.py
Source:run_application.py  
...23        scenario = killer_bots.KillerBotsScenario()24        core_logic.process_scenario(config, database, scenario)        25    else:26        print('%s is not a valid scenario' %(scenario_chosen))27def make_report(scenario_chosen):28    if scenario_chosen == 'chasing_target':29        config = config_application.ConfigChasingTarget()30        database = config_application.DatabaseConfig(config)31        report_writer.make_report(database, config.success_string)32    elif scenario_chosen == 'destroy_target':33        config = config_application.ConfigDestroyTarget()34        database = config_application.DatabaseConfig(config)35        report_writer.make_report(database, config.success_string)36    elif scenario_chosen == 'killer_bots':37        config = config_application.ConfigKillerBots()38        database = config_application.DatabaseConfig(config)39        report_writer.make_report(database, config.success_string)40    else:41        print('%s is not a valid scenario' %(scenario_chosen))42#def make_video(scenario_chosen):43#    print("from the application folder and animation_control module")44def process_inputs(inputs):45    action_chosen = inputs[1]46    scenario_chosen = inputs[2]47    if action_chosen == 'do_scenario':48        do_scenario(scenario_chosen)49    #elif action_chosen == 'make_video':50    #    make_video(scenario_chosen)51    elif action_chosen == 'make_report':52        make_report(scenario_chosen)53    else:54        print('%s is not a valid action' %(action_chosen))55def receive_inputs():56    inputs = sys.argv57    input_number_required = 358    if len(inputs) == input_number_required:59        process_inputs(inputs)60    else:61        print('Enter your scenario to process. Eg -'62              ' ```python3 run_application.py do_scenario chasing_target``` '63              'is a valid one')64receive_inputs()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
