Best Python code snippet using lisa_python
test_create_task_mappings.py
Source:test_create_task_mappings.py  
...393class TestFlippedTasksPerBuild:394    def test_get_flipped_tasks_per_build(self):395        build_variant = "test"396        tasks = [397            _mock_task(display_name=str(i), activated=True, status="Success") for i in range(12)398        ]399        build_mock = MagicMock(build_variant=build_variant)400        build_mock.get_tasks.return_value = tasks401        prev_version = MagicMock()402        prev_build = MagicMock()403        prev_build.get_tasks.return_value = [404            _mock_task(405                display_name=str(i), activated=True, status="Success" if (i % 2 == 0) else "Failed"406            )407            for i in range(11)408        ]409        prev_version.build_by_variant.return_value = prev_build410        next_version = MagicMock()411        next_build = MagicMock()412        next_build.get_tasks.return_value = [413            _mock_task(414                display_name=str(i), activated=True, status="Success" if (i % 3 == 0) else "Failed"415            )416            for i in range(11)417        ]418        next_version.build_by_variant.return_value = next_build419        flipped_tasks = under_test._get_flipped_tasks_per_build(420            build_mock, prev_version, next_version421        )422        expected = ["3", "9"]423        assert len(flipped_tasks) == len(expected)424        for name in expected:425            assert name in flipped_tasks426    def test_no_get_flipped_tasks_if_tasks_werent_active(self):427        build_variant = "test"428        tasks = [_mock_task(display_name=str(i), activated=False, status="True") for i in range(10)]429        build_mock = MagicMock(build_variant=build_variant)430        build_mock.get_tasks.return_value = tasks431        prev_version = MagicMock()432        prev_build = MagicMock()433        prev_build.get_tasks.return_value = [434            _mock_task(display_name=str(i), status=str(i % 2 == 0)) for i in range(9)435        ]436        prev_version.build_by_variant.return_value = prev_build437        next_version = MagicMock()438        next_build = MagicMock()439        next_build.get_tasks.return_value = [440            _mock_task(display_name=str(i), status=str(i % 3 == 0)) for i in range(9)441        ]442        next_version.build_by_variant.return_value = next_build443        flipped_tasks = under_test._get_flipped_tasks_per_build(444            build_mock, prev_version, next_version445        )446        assert len(flipped_tasks) == 0447    def test_no_get_flipped_tasks_if_prev_version_does_not_contain_variant(self):448        build_variant = "test"449        tasks = [450            _mock_task(display_name=str(i), activated=True, status="Success") for i in range(12)451        ]452        build_mock = MagicMock(build_variant=build_variant)453        build_mock.get_tasks.return_value = tasks454        prev_version = MagicMock()455        prev_version.build_by_variant.side_effect = KeyError456        next_version = MagicMock()457        next_build = MagicMock()458        next_build.get_tasks.return_value = [459            _mock_task(460                display_name=str(i), activated=True, status="Success" if (i % 3 == 0) else "Failed"461            )462            for i in range(11)463        ]464        next_version.build_by_variant.return_value = next_build465        flipped_tasks = under_test._get_flipped_tasks_per_build(466            build_mock, prev_version, next_version467        )468        assert len(flipped_tasks) == 0469    def test_no_get_flipped_tasks_if_next_version_does_not_contain_variant(self):470        build_variant = "test"471        tasks = [472            _mock_task(display_name=str(i), activated=True, status="Success") for i in range(12)473        ]474        build_mock = MagicMock(build_variant=build_variant)475        build_mock.get_tasks.return_value = tasks476        prev_version = MagicMock()477        prev_build = MagicMock()478        prev_build.get_tasks.return_value = [479            _mock_task(480                display_name=str(i), activated=True, status="Success" if (i % 2 == 0) else "Failed"481            )482            for i in range(11)483        ]484        prev_version.build_by_variant.return_value = prev_build485        next_version = MagicMock()486        next_version.build_by_variant.side_effect = KeyError487        flipped_tasks = under_test._get_flipped_tasks_per_build(488            build_mock, prev_version, next_version489        )490        assert len(flipped_tasks) == 0491class TestCreateTaskMap:492    def test_create_task_map(self):493        tasks = [MagicMock(display_name=f"name{i}") for i in range(7)]494        mapped_tasks = under_test._create_task_map(tasks)495        for task in tasks:496            assert mapped_tasks[task.display_name] is not None497            assert mapped_tasks[task.display_name] == task498    def test_create_task_map_execution(self):499        display_task = MagicMock(500            display_name="name", json={"execution_tasks": [f"id{i}" for i in range(7)]}501        )502        execution_tasks = [MagicMock(display_name=f"name{i}", task_id=f"id{i}") for i in range(7)]503        tasks = [display_task] + execution_tasks504        mapped_tasks = under_test._create_task_map(tasks)505        assert len(mapped_tasks) == 1506        assert mapped_tasks["name"] == display_task507class TestIsTaskAFlip:508    def test_non_activated_task_is_not_a_flip(self):509        mock_task = _mock_task(activated=False)510        assert not under_test._is_task_a_flip(mock_task, {}, {})511    def test_not_a_success_or_failed_status_is_not_a_flip(self):512        mock_task = _mock_task(activated=True, status="started")513        assert not under_test._is_task_a_flip(mock_task, {}, {})514    def test_no_previous_task_is_not_a_flip(self):515        mock_task = _mock_task(activated=True, status="success")516        mock_task.is_success.return_value = False517        assert not under_test._is_task_a_flip(mock_task, {}, {})518    def test_no_next_task_is_not_a_flip(self):519        mock_task = _mock_task(activated=True, status="success")520        mock_task.is_success.return_value = False521        mock_prev_task = _mock_task(activated=True, status=mock_task.status)522        tasks_prev = {mock_task.display_name: mock_prev_task}523        assert not under_test._is_task_a_flip(mock_task, tasks_prev, {})524    def test_all_statuses_same_is_no_flip(self):525        mock_task = _mock_task(activated=True, status="failed")526        mock_task.is_success.return_value = False527        mock_prev_task = _mock_task(activated=True, status=mock_task.status)528        tasks_prev = {mock_task.display_name: mock_prev_task}529        mock_next_task = _mock_task(activated=True, status=mock_task.status)530        tasks_next = {mock_task.display_name: mock_next_task}531        assert not under_test._is_task_a_flip(mock_task, tasks_prev, tasks_next)532    def test_next_and_current_same_status_is_a_flip(self):533        mock_task = _mock_task(activated=True, status="failed")534        mock_task.is_success.return_value = False535        mock_prev_task = _mock_task(activated=True, status="success")536        tasks_prev = {mock_task.display_name: mock_prev_task}537        mock_next_task = _mock_task(activated=True, status=mock_task.status)538        tasks_next = {mock_task.display_name: mock_next_task}539        assert under_test._is_task_a_flip(mock_task, tasks_prev, tasks_next)540        mock_task = _mock_task(activated=True, status="success")541        mock_task.is_success.return_value = False542        mock_prev_task = _mock_task(activated=True, status="failed")543        tasks_prev = {mock_task.display_name: mock_prev_task}544        mock_next_task = _mock_task(activated=True, status=mock_task.status)545        tasks_next = {mock_task.display_name: mock_next_task}546        assert under_test._is_task_a_flip(mock_task, tasks_prev, tasks_next)547    def test_prev_task_not_activated_is_not_a_flip(self):548        mock_task = _mock_task(activated=True, status="failed")549        mock_task.is_success.return_value = False550        mock_prev_task = _mock_task(activated=False, status="success")551        tasks_prev = {mock_task.display_name: mock_prev_task}552        mock_next_task = _mock_task(activated=True, status=mock_task.status)553        tasks_next = {mock_task.display_name: mock_next_task}554        assert not under_test._is_task_a_flip(mock_task, tasks_prev, tasks_next)555    def test_next_task_not_activated_is_not_a_flip(self):556        mock_task = _mock_task(activated=True, status="failed")557        mock_task.is_success.return_value = False558        mock_prev_task = _mock_task(activated=True, status="success")559        tasks_prev = {mock_task.display_name: mock_prev_task}560        mock_next_task = _mock_task(activated=False, status=mock_task.status)561        tasks_next = {mock_task.display_name: mock_next_task}562        assert not under_test._is_task_a_flip(mock_task, tasks_prev, tasks_next)563    def test_prev_and_current_same_status_is_not_a_flip(self):564        mock_task = _mock_task(activated=True, status="failed")565        mock_task.is_success.return_value = False566        mock_prev_task = _mock_task(activated=True, status=mock_task.status)567        tasks_prev = {mock_task.display_name: mock_prev_task}568        mock_next_task = _mock_task(activated=True, status="success")569        tasks_next = {mock_task.display_name: mock_next_task}570        assert not under_test._is_task_a_flip(mock_task, tasks_prev, tasks_next)571class TestMeaningfulTaskStatus:572    def test_success_failed_status_pass(self):573        pass_mock = MagicMock(status="success")574        assert under_test._check_meaningful_task_status(pass_mock)575        failed_mock = MagicMock(status="failed")576        assert under_test._check_meaningful_task_status(failed_mock)577    def test_other_task_status_do_not_pass(self):578        task_mock = MagicMock(status="started")579        assert not under_test._check_meaningful_task_status(task_mock)580def _mock_task(activated: bool = None, status: str = None, display_name: str = None):581    return MagicMock(activated=activated, status=status, display_name=display_name)582class TestGenerateTaskMappings:583    @patch(ns("TaskMappings.create_task_mappings"))584    def test_generates_task_mappings(self, create_task_mappings_mock):585        mock_evg_api = MagicMock()586        created_task_mock = MagicMock()587        created_task_mock.transform.return_value = ["mock-mappings"]588        create_task_mappings_mock.return_value = (created_task_mock, "most-recent-version-analyzed")589        task_mappings, most_recent_version_analyzed = under_test.generate_task_mappings(590            mock_evg_api,591            "mongodb-mongo-master",592            VersionLimit(stop_at_version_id="my-version"),593            ".*src",594            module_name="my-module",...rexflow_api.py
Source:rexflow_api.py  
...27    ValidatedPayload,28    ValidatorResults,29)30from rexflow_ui.errors import ValidationErrorDetails31def _mock_task():32    return Task(33        iid=MOCK_IID,34        tid=MOCK_TID,35        data=[36            TaskFieldData(37                dataId='uname',38                type='TEXT',39                order=1,40            ),41        ],42    )43def _mock_validation_error():44    return ValidationErrorDetails.init_from_payload(45        payload=ValidatedPayload(46            iid=MOCK_IID,47            tid=MOCK_TID,48            passed=False,49            results=[50                FieldValidationResult(51                    dataId=MOCK_DATA_ID,52                    passed=False,53                    results=[54                        ValidatorResults(55                            passed=False,56                            message='test failed validation',57                        ),58                    ],59                ),60            ],61            status=OperationStatus.FAILURE,62        ),63    )64def _mock_error():65    return ErrorDetails(message='test error')66def _mock_workflow(with_tasks=True):67    if with_tasks:68        tasks = [69            _mock_task(),70        ]71    else:72        tasks = []73    return Workflow(74        did=MOCK_DID,75        iid=MOCK_IID,76        status=WorkflowStatus.RUNNING,77        tasks=tasks,78    )79async def get_available_workflows(refresh=False) -> List[WorkflowDeployment]:80    return [81        WorkflowDeployment(82            name=MOCK_NAME,83            deployments=[MOCK_DID],84            bridge_url='',85        )86    ]87async def find_workflow_deployment(88    deployment_id: WorkflowDeploymentId,89) -> Optional[WorkflowDeployment]:90    return WorkflowDeployment(91        name=MOCK_NAME,92        deployments=[deployment_id],93        bridge_url='',94    )95async def start_workflow(96    deployment_id: WorkflowDeploymentId,97    metadata: List[MetaData] = [],98) -> Workflow:99    return _mock_workflow(with_tasks=False)100async def start_workflow_by_name(101    workflow_name: str,102    metadata: List[MetaData] = [],103) -> Workflow:104    return _mock_workflow(with_tasks=False)105async def refresh_workflows():106    pass107@validate_arguments108async def get_active_workflows(109    iids: List[WorkflowInstanceId] = [],110    metadata: Dict = {},111) -> List[Workflow]:112    return [_mock_workflow()]113async def complete_workflow(instance_id: WorkflowInstanceId) -> None:114    pass115async def cancel_workflow(116    instance_id: WorkflowInstanceId,117) -> bool:118    return True119@validate_arguments120async def start_tasks(121    iid: WorkflowInstanceId,122    tasks: List[TaskId]123) -> List[Task]:124    return [125        Task(126            iid=iid,127            tid=tid,128        )129        for tid in tasks130    ]131@validate_arguments132async def get_task(iid: WorkflowInstanceId, tid: TaskId) -> Task:133    return _mock_task()134@validate_arguments135async def validate_tasks(tasks: List[TaskChange]) -> TaskOperationResults:136    result = TaskOperationResults()137    if tasks:138        result.successful = [_mock_task()]139    else:140        result.errors = [_mock_validation_error(), _mock_error()]141    return result142@validate_arguments143async def save_tasks(tasks: List[TaskChange]) -> TaskOperationResults:144    result = TaskOperationResults()145    if tasks:146        result.successful = [_mock_task()]147    else:148        result.errors = [_mock_validation_error(), _mock_error()]149    return result150@validate_arguments151async def complete_tasks(tasks: List[TaskChange]) -> TaskOperationResults:152    result = TaskOperationResults()153    if tasks:154        result.successful = [_mock_task()]155    else:156        result.errors = [_mock_validation_error(), _mock_error()]...gcs_download_test.py
Source:gcs_download_test.py  
...6import tarfile7import unittest8from unittest import mock9from gcs_download import DownloadAndUnpackFromCloudStorage10def _mock_task(status_code: int = 0, stderr: str = '') -> mock.Mock:11  task_mock = mock.Mock()12  attrs = {13      'returncode': status_code,14      'wait.return_value': status_code,15      'communicate.return_value': (None, stderr.encode()),16  }17  task_mock.configure_mock(**attrs)18  return task_mock19@mock.patch('subprocess.Popen')20@mock.patch('tarfile.open')21class TestDownloadAndUnpackFromCloudStorage(unittest.TestCase):22  def testHappyPath(self, mock_tarfile, mock_popen):23    mock_popen.return_value = _mock_task()24    DownloadAndUnpackFromCloudStorage('', '')25  def testFailedTarOpen(self, mock_tarfile, mock_popen):26    mock_popen.return_value = _mock_task(stderr='some error')27    mock_tarfile.side_effect = tarfile.ReadError()28    with self.assertRaises(subprocess.CalledProcessError):29      DownloadAndUnpackFromCloudStorage('', '')30  def testBadTaskStatusCode(self, mock_tarfile, mock_popen):31    mock_popen.return_value = _mock_task(stderr='some error', status_code=1)32    with self.assertRaises(subprocess.CalledProcessError):33      DownloadAndUnpackFromCloudStorage('', '')34if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
