Best Python code snippet using avocado_python
test_tasks.py
Source:test_tasks.py  
...56    def test_http_error(self, status_code):57        with patch.object(tasks.lms, self.mocked_get_enrollments_method) as mock_get_enrollments:58            error = HTTPError(request=FakeRequest('registrar.edx.org'), response=FakeResponse(status_code))59            mock_get_enrollments.side_effect = error60            task = self.spawn_task()  # pylint: disable=assignment-from-no-return61            task.wait()62        expected_msg = f"HTTP error {status_code} when getting enrollments at registrar.edx.org"63        self.assert_failed(expected_msg)64    def test_invalid_data(self):65        with patch.object(tasks.lms, self.mocked_get_enrollments_method) as mock_get_enrollments:66            mock_get_enrollments.side_effect = ValidationError()67            task = self.spawn_task()  # pylint: disable=assignment-from-no-return68            task.wait()69        self.assert_failed("Invalid enrollment data from LMS")70    def test_invalid_format(self):71        with patch.object(tasks.lms, self.mocked_get_enrollments_method) as mock_get_enrollments:72            mock_get_enrollments.return_value = self.enrollment_data73            exception_raised = False74            try:75                task = self.spawn_task(file_format='invalid-format')  # pylint: disable=assignment-from-no-return76                task.wait()77            except ValueError as e:78                self.assertIn('Invalid file_format', str(e))79                exception_raised = True80        self.assertTrue(exception_raised)81@patch_discovery_program_details({})82class ListProgramEnrollmentTaskTests(ListEnrollmentTaskTestMixin, TestCase):83    """ Tests for task error behavior. """84    enrollment_statuses = (85        PROGRAM_ENROLLMENT_ENROLLED,86        PROGRAM_ENROLLMENT_PENDING,87    )88    mocked_get_enrollments_method = 'get_program_enrollments'89    def spawn_task(self, program_key=None, **kwargs):90        return tasks.list_program_enrollments.apply_async(91            (92                self.job_id,93                self.user.id,94                kwargs.get('file_format', 'json'),95                program_key or self.program.key,96            ),97            task_id=self.job_id98        )99@patch_discovery_program_details({})100class ListCourseRunEnrollmentTaskTests(ListEnrollmentTaskTestMixin, TestCase):101    """ Tests for task error behavior. """102    enrollment_statuses = (103        COURSE_ENROLLMENT_ACTIVE,104        COURSE_ENROLLMENT_INACTIVE,105    )106    mocked_get_enrollments_method = 'get_course_run_enrollments'107    internal_course_key = 'course-1'108    external_course_key = 'external_course_key'109    @classmethod110    def setUpTestData(cls):111        super().setUpTestData()112        for enrollment in cls.enrollment_data:113            enrollment['course_id'] = cls.external_course_key114    def spawn_task(self, program_key=None, **kwargs):115        return tasks.list_course_run_enrollments.apply_async(116            (117                self.job_id,118                self.user.id,119                kwargs.get('file_format', 'json'),120                program_key or self.program.key,121                self.internal_course_key,122                self.external_course_key,123            ),124            task_id=self.job_id125        )126@patch_discovery_program_details({127    'curricula': [{128        'is_active': True,129        'courses': [{130            'course_runs': [{131                'key': 'course-key',132                'external_key': 'external-key',133                'title': 'title',134                'marketing_url': 'www',135            }],136        }],137    }],138})139class ListAllCourseRunEnrollmentTaskTests(ListEnrollmentTaskTestMixin, TestCase):140    """ Tests for task error behavior. """141    enrollment_statuses = (142        COURSE_ENROLLMENT_ACTIVE,143        COURSE_ENROLLMENT_INACTIVE,144    )145    mocked_get_enrollments_method = 'get_course_run_enrollments'146    course_key = 'course-1'147    external_course_key = 'external_course_key'148    default_course_run = {'key': 'course-key', 'external_key': 'external-key'}149    @classmethod150    def setUpTestData(cls):151        super().setUpTestData()152        for enrollment in cls.enrollment_data:153            enrollment['course_id'] = cls.external_course_key154    def spawn_task(self, program_key=None, **kwargs):155        return tasks.list_all_course_run_enrollments.apply_async(156            (157                self.job_id,158                self.user.id,159                kwargs.get('file_format', 'json'),160                program_key or self.program.key,161            ),162            task_id=self.job_id163        )164class WriteEnrollmentTaskTestMixin(BaseTaskTestMixin, S3MockEnvVarsMixin):165    """166    Tests common for both program and course-run enrollment writing tasks.167    """168    json_filepath = "testfile.csv"169    mock_function = None  # Override in subclass170    expected_fieldnames = None  # Override in subclass171    @classmethod172    def setUpClass(cls):173        # This is unfortunately duplicated from:174        #   registrar.apps.api.v1.tests.test_views:S3MockMixin.175        # It would be ideal to move that mixin to a utilities file and re-use176        # it here, but moto seems to have a bug/"feature" where it only works177        # in modules that explicitly import it.178        super().setUpClass()179        cls._s3_mock = moto.mock_s3()180        cls._s3_mock.start()181        conn = boto3.resource('s3')182        conn.create_bucket(Bucket=settings.REGISTRAR_BUCKET)183    @classmethod184    def tearDownClass(cls):185        cls._s3_mock.stop()186        super().tearDownClass()187    def tearDown(self):188        super().tearDown()189        uploads_filestore.delete(self.json_filepath)190    def mock_write_enrollments(self, any_successes, any_failures):191        """192        Creates a mock function that returns results normally returned193        by an enrollment-writing function.194        """195        raise NotImplementedError()  # pragma: no cover196    def test_empty_list_file(self):197        uploads_filestore.store(self.json_filepath, "[]")198        with patch.object(199                tasks.lms,200                self.mock_function,201                new=self.mock_write_enrollments(False, False),202        ):203            self.spawn_task().wait()204        self.assert_succeeded(','.join(self.expected_fieldnames) + "\r\n", "204")205    def test_no_such_file(self):206        self.spawn_task().wait()207        self.assert_failed(208            f"Enrollment file for program_key={self.program.key} not found at {self.json_filepath}"209        )210    def test_file_not_json(self):211        uploads_filestore.store(self.json_filepath, "this is not valid json")212        self.spawn_task().wait()213        self.assert_failed(214            f"Enrollment file for program_key={self.program.key} at {self.json_filepath} is not valid JSON"215        )216@ddt.ddt217@patch_discovery_program_details({})218class WriteProgramEnrollmentTaskTests(WriteEnrollmentTaskTestMixin, TestCase):219    """220    Tests for write_program_enrollments task.221    """222    mock_function = 'write_program_enrollments'223    expected_fieldnames = ('student_key', 'status')224    def spawn_task(self, program_key=None, **kwargs):225        return tasks.write_program_enrollments.apply_async(226            (227                self.job_id,228                self.user.id,229                self.json_filepath,230                program_key or self.program.key,231            ),232            task_id=self.job_id233        )234    def mock_write_enrollments(self, any_successes, any_failures):235        """236        Create mock for data.write_program_enrollments237        Mock will return `any_successes`, `any_failures`, and `enrollments`238        echoed back as a dictionary.239        """240        def inner(_method, program_uuid, enrollments):241            """ Mock for data.write_program_enrollments. """242            self.assertIsInstance(program_uuid, UUID)243            results = OrderedDict([244                (enrollment['student_key'], enrollment['status'])245                for enrollment in enrollments246            ])247            return any_successes, any_failures, results248        return inner249    @ddt.data(250        (True, False, "200"),251        (True, True, "207"),252        (False, True, "422"),253    )254    @ddt.unpack255    def test_success(self, any_successes, any_failures, expected_code_str):256        enrolls = [257            {'student_key': 'john', 'status': 'x'},258            {'student_key': 'bob', 'status': 'y'},259            {'student_key': 'serena', 'status': 'z'},260        ]261        uploads_filestore.store(self.json_filepath, json.dumps(enrolls))262        with patch.object(263                tasks.lms,264                self.mock_function,265                new=self.mock_write_enrollments(any_successes, any_failures),266        ):267            self.spawn_task().wait()268        self.assert_succeeded(269            "student_key,status\r\n"270            "john,x\r\n"271            "bob,y\r\n"272            "serena,z\r\n",273            expected_code_str,274        )275@ddt.ddt276@patch_discovery_program_details({})277class WriteCourseRunEnrollmentTaskTests(WriteEnrollmentTaskTestMixin, TestCase):278    """279    Tests for write_course_run_enrollments task.280    """281    mock_function = 'write_course_run_enrollments'282    expected_fieldnames = ('course_id', 'student_key', 'status')283    def spawn_task(self, program_key=None, **kwargs):284        return tasks.write_course_run_enrollments.apply_async(285            (286                self.job_id,287                self.user.id,288                self.json_filepath,289                program_key or self.program.key,290            ),291            task_id=self.job_id292        )293    # pylint: disable=arguments-differ294    def mock_write_enrollments(self, any_successes, any_failures, expected_enrolls_by_course_key=None):295        """296        Create mock for data.write_course_run_enrollments.297        Mock will return `any_successes`, `any_failures`, and `enrollments`298        echoed back as a dictionary.299        """300        def inner(_method, program_uuid, course_key, enrollments):301            """ Mock for data.write_course_run_enrollments. """302            self.assertIsInstance(program_uuid, UUID)303            if expected_enrolls_by_course_key is not None:  # pragma: no branch304                self.assertListEqual(enrollments, expected_enrolls_by_course_key.get(course_key))305            results = OrderedDict([306                (enrollment['student_key'], enrollment['status'])307                for enrollment in enrollments308            ])309            return any_successes, any_failures, results310        return inner311    @ddt.data(312        (True, False, "200"),313        (True, True, "207"),314        (False, True, "422"),315    )316    @ddt.unpack317    @patch.object(318        ProgramDetails, 'get_course_key', lambda _self, x: x + '-internal'319    )320    def test_success_status(self, any_successes, any_failures, expected_code_str):321        enrolls = [322            {'student_key': 'john', 'status': 'x', 'course_id': 'course-1'},323            {'student_key': 'bob', 'status': 'y', 'course_id': 'course-1'},324            {'student_key': 'serena', 'status': 'z', 'course_id': 'course-2'},325        ]326        expected_enrolls_by_course_key = {327            "course-1-internal": [328                {'status': 'x', 'student_key': 'john'},329                {'status': 'y', 'student_key': 'bob'},330            ],331            "course-2-internal": [332                {'status': 'z', 'student_key': 'serena'},333            ]334        }335        uploads_filestore.store(self.json_filepath, json.dumps(enrolls))336        with patch.object(337                tasks.lms,338                self.mock_function,339                new=self.mock_write_enrollments(any_successes, any_failures, expected_enrolls_by_course_key),340        ):341            self.spawn_task().wait()342        self.assert_succeeded(343            "course_id,student_key,status\r\n"344            "course-1,john,x\r\n"345            "course-1,bob,y\r\n"346            "course-2,serena,z\r\n",347            expected_code_str,348        )349    @patch.object(350        ProgramDetails, 'get_course_key', lambda _self, x: x + '-internal'351    )352    def test_success_status_course_staff_included(self):353        enrolls = [354            {'student_key': 'john', 'status': 'x', 'course_id': 'course-1', 'course_staff': 'TRUE'},355            {'student_key': 'bob', 'status': 'y', 'course_id': 'course-1', 'course_staff': 'FALSE'},356            {'student_key': 'serena', 'status': 'z', 'course_id': 'course-2'},357        ]358        expected_enrolls_by_course_key = {359            "course-1-internal": [360                {'status': 'x', 'course_staff': 'TRUE', 'student_key': 'john'},361                {'status': 'y', 'course_staff': 'FALSE', 'student_key': 'bob'},362            ],363            "course-2-internal": [364                {'status': 'z', 'student_key': 'serena'},365            ]366        }367        uploads_filestore.store(self.json_filepath, json.dumps(enrolls))368        with patch.object(369                tasks.lms,370                self.mock_function,371                new=self.mock_write_enrollments(True, False, expected_enrolls_by_course_key),372        ):373            self.spawn_task().wait()374        expected_code_str = '200'375        self.assert_succeeded(376            "course_id,student_key,status,course_staff\r\n"377            "course-1,john,x,TRUE\r\n"378            "course-1,bob,y,FALSE\r\n"379            "course-2,serena,z,\r\n",380            expected_code_str,381        )382    @patch.object(383        ProgramDetails, 'get_course_key', {'course-1': 'course-1-internal'}.get384    )385    @patch_discovery_program_details({})386    def test_success_invalid_course_id(self):387        enrolls = [388            {'student_key': 'john', 'status': 'x', 'course_id': 'course-1'},389            {'student_key': 'bob', 'status': 'y', 'course_id': 'course-1'},390            {'student_key': 'serena', 'status': 'z', 'course_id': 'invalid-course'},391        ]392        expected_enrolls_by_course_key = {393            "course-1-internal": [394                {'status': 'x', 'student_key': 'john'},395                {'status': 'y', 'student_key': 'bob'},396            ],397            "invalid-course-internal": [398                {'status': 'z', 'student_key': 'serena'},399            ]400        }401        uploads_filestore.store(self.json_filepath, json.dumps(enrolls))402        with patch.object(403                tasks.lms,404                self.mock_function,405                new=self.mock_write_enrollments(True, False, expected_enrolls_by_course_key),406        ):407            self.spawn_task().wait()408        self.assert_succeeded(409            "course_id,student_key,status\r\n"410            "course-1,john,x\r\n"411            "course-1,bob,y\r\n"412            "invalid-course,serena,course-not-found\r\n",413            "207",...session.py
Source:session.py  
...34        self.got_socket.set()35    def unset_socket(self):36        self.socket = None37        self.got_socket.clear()38    def spawn_task(self, coro):39        task = asyncio.ensure_future(self.exceptionlogger(coro))40        self.tasks.append(task)41        return task42    async def exceptionlogger(self,coro):43        try:44            await coro45        except asyncio.CancelledError:46            pass # so we can await a task being stopped47        except:48            log.error('{}: Exception in task: {}'.format(self.session_id, traceback.format_exc()))49    async def stop_tasks(self):50        waitfor = []51        for task in self.tasks:52            if not task.done() and not task is asyncio.Task.current_task():53                task.cancel()54                waitfor.append(task)55        if waitfor:56            await asyncio.wait(waitfor) # necessary for server close57    async def send(self,text):58        socket = await self.get_socket()59        await socket.send_str(text)60        log.debug('{}: > {}'.format(self.session_id, text[:20]+'...'))61    async def get_socket(self):62        await self.got_socket.wait()63        return self.socket64    async def handle_incoming(self,text):65        log.debug('{}: < {} [IGNORED]'.format(self.session_id,text))66    async def close(self):67        if self.open:68            self.open = False69            log.debug('{}: CLOSING'.format(self.session_id))70            if self.socket is not None:71                await self.socket.close()72                self.unset_socket()73            await self.stop_tasks()74            log.info('{}: CLOSED'.format(self.session_id))75class FeedSession(BaseSession):76    def __init__(self,session_id,app):77        super().__init__(session_id)78        self.app=app79        self.info = SessionInfo()80        self.sendq = FiniteQueue()81        self.recvq = FiniteQueue()82        self.spawn_task(self.process_sendq())83        self.spawn_task(self.run_protocol())84        self.spawn_task(self.listen_info())85        self.spawn_task(self.send_passphrase())86        self.subscriberlists = defaultdict(list) # message class -> [Future]87    async def process_sendq(self):88        async for msg in self.sendq:89            while True:90                try:91                    await self.send(msg)92                    break93                except RuntimeError as err: #connection drop94                    log.info('{}: retry sending {} because of {}'.format(self.session_id, msg, err))95    async def handle_incoming(self,text):96        # dispatch to recvq so we can react immediately on connection drop/close97        await self.recvq.put(m.Message.parse(text))98    async def schedule_send(self,msg):99        await self.sendq.put(str(msg))100    async def run_protocol(self):101        await Redis.of(self.app).publish(self.session_id, m.TTY_Opened(ttyId=self.session_id))102        await self.schedule_banner()103        async for msg in self.recvq:104            subs = self.subscriberlists[msg.__class__]105            if subs:106                log.debug('{}: < {}'.format(self.session_id, msg.__class__.__name__))107                for sub in subs:108                    # NB will raise InvalidStateError if prev future was not consumed!109                    sub.set_result(msg)110            else:111                log.debug('{}: No subscribers for {}'.format(self.session_id,msg.__class__.__name__))112    async def schedule_banner(self):113        for line in banner.splitlines():114            await self.schedule_send(m.Line(line))115    async def close(self):116        if self.open:117            await Redis.of(self.app).publish(self.session_id, m.TTY_Closed(ttyId=self.session_id))118        await super().close()119    async def subscribe_once(self,mclass):120        fut = asyncio.Future()121        self.subscriberlists[mclass].append(fut)122        res = await fut123        self.subscriberlists[mclass].remove(fut)124        return res125    async def listen_info(self):126        async for msg in Subscriber(self.subscriberlists[m.Info]):127            for k, v in msg.__dict__.items():128                setattr(self.info, k, v)129            log.debug('{}: info acquired'.format(self.session_id))130    async def send_passphrase(self):131        msg = await self.subscribe_once(m.Info)132        text = 'Access this terminal with the following passphrase:'133        await self.schedule_send(m.Line(text))134        await self.schedule_send(m.Line(msg.passphrase))135    async def read_line_conv(self,socket,line=True,echo=False):136        rls = BaseSession(self.session_id+'_R')137        async def copy_from_feedclient():138            try:139                if line:140                    await self.schedule_send(m.ReadLine())141                    lr = await self.subscribe_once(m.LineRead)142                    await rls.send(lr.text)143                else:144                    await self.schedule_send(m.ReadKey(echo=echo))145                    kr = await self.subscribe_once(m.KeyRead)146                    await rls.send(kr.key)147            except asyncio.CancelledError:148                log.info('{} closing because parent session closes:'.format(rls.session_id))149            finally:150                await asyncio.shield(rls.close()) # otherwise it will indirectly cancel itself151        copy_task = self.spawn_task(copy_from_feedclient())152        await rls.run_socket(socket)153        await rls.close()154        if not copy_task.done():155            copy_task.cancel()156            await copy_task...inteltool.py
Source:inteltool.py  
...31        mr1.kills = True32        mr1.losses = True33        mr1.after = None34        print("Running Blingy Tackle Report...")35        spawn_task(mr1)36        # Capital Pilots 30 Day Profile37        pr1 = pvpreport.ProfileReport(get_new_session())38        pr1.alliance_ids = self.alliance_ids39        pr1.corporation_ids = self.corporation_ids40        pr1.groupids = self.CAPITAL_GROUP41        pr1.kills = True42        pr1.losses = False43        pr1.after = after44        print("Running Capital Pilots 30 Day Profile...")45        spawn_task(pr1)46        # Subcap Activity Kills 30 Days47        ar1 = pvpreport.ActivityReport(get_new_session())48        ar1.alliance_ids = self.alliance_ids49        ar1.corporation_ids = self.corporation_ids50        ar1.groupids = self.CAPITAL_GROUP51        ar1.groupidsinvert = True52        ar1.kills = True53        ar1.after = after54        print("Running Subcaps 30 Day Kills...")55        spawn_task(ar1)56        # Subcap Activity Loss 30 Days57        ar2 = pvpreport.ActivityReport(get_new_session())58        ar2.alliance_ids = self.alliance_ids59        ar2.corporation_ids = self.corporation_ids60        ar2.groupids = self.CAPITAL_GROUP61        ar2.groupidsinvert = True62        ar2.kills = False63        ar2.losses = True64        ar2.after = after65        spawn_task(ar2)66        # Captial Activity Kills 30 Days67        ar3 = pvpreport.ActivityReport(get_new_session())68        ar3.alliance_ids = self.alliance_ids69        ar3.corporation_ids = self.corporation_ids70        ar3.groupids = self.CAPITAL_GROUP71        ar3.groupidsinvert = False72        ar3.kills = True73        ar3.after = after74        print("Running Subcaps 30 Day Losses...")75        spawn_task(ar3)76        # CapvSub Activity Kills 30 Days77        ar4 = pvpreport.ActivityReport(get_new_session())78        ar4.alliance_ids = self.alliance_ids79        ar4.corporation_ids = self.corporation_ids80        ar4.groupids = self.CAPITAL_GROUP81        ar4.groupidsinvert = False82        ar4.againstgroup_ids = self.ALL_SHIPS83        ar4.kills = True84        ar4.after = after85        print("Running CapvSub Activity 30 Days...")86        spawn_task(ar4)87        kr1 = pvpreport.KillMailReport(get_new_session())88        kr1.alliance_ids = self.alliance_ids89        kr1.corporation_ids = self.corporation_ids90        kr1.groupids = self.CAPITAL_GROUP91        kr1.groupidsinvert = False92        kr1.againstgroup_ids = self.ALL_SHIPS93        kr1.after = after94        print("Running CapvSub Killmails 30 Days...")95        spawn_task(kr1)96        # Capital Activity Loss 30 Days97        ar5 = pvpreport.ActivityReport(get_new_session())98        ar5.alliance_ids = self.alliance_ids99        ar5.corporation_ids = self.corporation_ids100        ar5.groupids = self.CAPITAL_GROUP101        ar5.groupidsinvert = False102        ar5.kills = False103        ar5.losses = True104        ar5.after = after105        print("Running Capitals 30 Day Kills...")106        spawn_task(ar5)107        # Gang Report 30 Days108        gr1 = pvpreport.GangSizeReport(get_new_session())109        gr1.alliance_ids = self.alliance_ids110        gr1.corporation_ids = self.corporation_ids111        print("Running Gang Size Report...")112        spawn_task(gr1)113        # Wait for reports to complete114        wait_tasks()115        # Write report results to excel116        util.write_excel("%s_intel_data.xlsx" % (self.name), "Blingy Tackle", mr1.arr_result())117        util.write_excel("%s_intel_data.xlsx" % (self.name), "Capital Pilots 30 Days", pr1.arr_result())118        util.write_excel("%s_intel_data.xlsx" % (self.name), "Subcap Kill Activity 30 Days", ar1.arr_result())119        util.write_excel("%s_intel_data.xlsx" % (self.name), "Subcap Loss Activity 30 Days", ar2.arr_result())120        util.write_excel("%s_intel_data.xlsx" % (self.name), "Capital Kill Activity 30 Days", ar3.arr_result())121        util.write_excel("%s_intel_data.xlsx" % (self.name), "CapvSub Activity 30 Days", ar4.arr_result())122        util.write_excel("%s_intel_data.xlsx" % (self.name), "CapvSub Killmails 30 Days", kr1.arr_result())123        util.write_excel("%s_intel_data.xlsx" % (self.name), "Capital Loss Activity 30 Days", ar5.arr_result())...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
