Best Python code snippet using green
test_file_download.py
Source:test_file_download.py  
...1415        # Upload 1KB file using azcopy.16        src = file_path17        dest = util.test_share_url18        result = util.Command("copy").add_arguments(src).add_arguments(dest). \19            add_flags("log-level", "debug").execute_azcopy_copy_command()20        self.assertTrue(result)2122        # Verifying the uploaded file.23        # the resource local path should be the first argument for the azcopy validator.24        # the resource sas should be the second argument for azcopy validator.25        resource_url = util.get_resource_sas_from_share(filename)26        result = util.Command("testFile").add_arguments(file_path).add_arguments(resource_url).execute_azcopy_verify()27        self.assertTrue(result)2829        # downloading the uploaded file30        src = util.get_resource_sas_from_share(filename)31        dest = util.test_directory_path + "/test_1kb_file_download.txt"32        result = util.Command("copy").add_arguments(src).add_arguments(dest).add_flags("log-level",33                                                                                       "debug").execute_azcopy_copy_command()34        self.assertTrue(result)3536        # Verifying the downloaded file37        result = util.Command("testFile").add_arguments(dest).add_arguments(src).execute_azcopy_verify()38        self.assertTrue(result)3940    # test_upload_download_1kb_file_wildcard_all_files verifies the upload/download of 1Kb file with wildcard using azcopy.41    def test_upload_download_1kb_file_wildcard_all_files(self):42        # create file of size 1KB.43        filename = "test_upload_download_1kb_file_wildcard_all_files.txt"44        file_path = util.create_test_file(filename, 1024)4546        wildcard_path = file_path.replace(filename, "*")4748        # Upload 1KB file using azcopy.49        result = util.Command("copy").add_arguments(wildcard_path).add_arguments(util.test_share_url). \50            add_flags("log-level", "info").execute_azcopy_copy_command()51        self.assertTrue(result)5253        # Verifying the uploaded file.54        # the resource local path should be the first argument for the azcopy validator.55        # the resource sas should be the second argument for azcopy validator.56        resource_url = util.get_resource_sas_from_share(filename)57        result = util.Command("testFile").add_arguments(file_path).add_arguments(resource_url).execute_azcopy_verify()58        self.assertTrue(result)5960        # downloading the uploaded file61        src = util.get_resource_sas_from_share(filename)62        src_wildcard = util.get_resource_sas_from_share("*")63        dest = util.test_directory_path + "/test_upload_download_1kb_file_wildcard_all_files_dir"64        try:65            if os.path.exists(dest) and os.path.isdir(dest):66                shutil.rmtree(dest)67        except:68            self.fail('error removing directory ' + dest)69        finally:70            os.makedirs(dest)7172        result = util.Command("copy").add_arguments(src_wildcard).add_arguments(dest). \73            add_flags("log-level", "info").add_flags("include-pattern", filename.replace("wildcard", "*")). \74            execute_azcopy_copy_command()75        self.assertTrue(result)7677        # Verifying the downloaded file78        result = util.Command("testFile").add_arguments(os.path.join(dest, filename)).add_arguments(79            src).execute_azcopy_verify()80        self.assertTrue(result)8182    # test_upload_download_1kb_file_fullname verifies the upload/download of 1Kb file with wildcard using azcopy.83    def test_upload_download_1kb_file_wildcard_several_files(self):84        # create file of size 1KB.85        filename = "test_upload_download_1kb_file_wildcard_several_files.txt"86        prefix = "test_upload_download_1kb_file_wildcard_several*"87        file_path = util.create_test_file(filename, 1024)8889        wildcardSrc = file_path.replace(filename, prefix)90        # Upload 1KB file using azcopy.91        result = util.Command("copy").add_arguments(wildcardSrc).add_arguments(util.test_share_url). \92            add_flags("log-level", "info").execute_azcopy_copy_command()93        self.assertTrue(result)9495        # Verifying the uploaded file.96        # the resource local path should be the first argument for the azcopy validator.97        # the resource sas should be the second argument for azcopy validator.98        resource_url = util.get_resource_sas_from_share(filename)99        result = util.Command("testFile").add_arguments(file_path).add_arguments(resource_url).execute_azcopy_verify()100        self.assertTrue(result)101102        # downloading the uploaded file103        src = util.get_resource_sas_from_share(filename)104        wildcardSrc = util.get_resource_sas_from_share(prefix)105        dest = util.test_directory_path + "/test_upload_download_1kb_file_wildcard_several_files"106        try:107            if os.path.exists(dest) and os.path.isdir(dest):108                shutil.rmtree(dest)109        except:110            self.fail('error removing directory ' + dest)111        finally:112            os.makedirs(dest)113114        result = util.Command("copy").add_arguments(src).add_arguments(dest).add_flags("include-pattern", prefix). \115            add_flags("log-level", "info").execute_azcopy_copy_command()116        self.assertTrue(result)117118        # Verifying the downloaded file119        result = util.Command("testFile").add_arguments(os.path.join(dest, filename)).add_arguments(120            src).execute_azcopy_verify()121        self.assertTrue(result)122123    # util_test_n_1kb_file_in_dir_upload_download_share verifies the upload of n 1kb file to the share.124    def util_test_n_1kb_file_in_dir_upload_download_share(self, number_of_files):125        # create dir dir_n_files and 1 kb files inside the dir.126        dir_name = "dir_test_n_1kb_file_in_dir_upload_download_share_" + str(number_of_files) + "_files"127        sub_dir_name = "dir subdir_" + str(number_of_files) + "_files"128129        # create n test files in dir130        src_dir = util.create_test_n_files(1024, number_of_files, dir_name)131132        # create n test files in subdir, subdir is contained in dir133        util.create_test_n_files(1024, number_of_files, os.path.join(dir_name, sub_dir_name))134135        # execute azcopy command136        dest_share = util.test_share_url137        result = util.Command("copy").add_arguments(src_dir).add_arguments(dest_share). \138            add_flags("recursive", "true").add_flags("log-level", "info").execute_azcopy_copy_command()139        self.assertTrue(result)140141        # execute the validator.142        dest_azure_dir = util.get_resource_sas_from_share(dir_name)143        result = util.Command("testFile").add_arguments(src_dir).add_arguments(dest_azure_dir). \144            add_flags("is-object-dir", "true").execute_azcopy_verify()145        self.assertTrue(result)146147        download_azure_src_dir = dest_azure_dir148        download_local_dest_dir = src_dir + "_download"149150        try:151            if os.path.exists(download_local_dest_dir) and os.path.isdir(download_local_dest_dir):152                shutil.rmtree(download_local_dest_dir)153        except:154            self.fail("error removing " + download_local_dest_dir)155        finally:156            os.makedirs(download_local_dest_dir)157158        # downloading the directory created from azure file share through azcopy with recursive flag to true.159        result = util.Command("copy").add_arguments(download_azure_src_dir).add_arguments(160            download_local_dest_dir).add_flags("log-level", "info").add_flags("recursive", "true").execute_azcopy_copy_command()161        self.assertTrue(result)162163        # verify downloaded file.164        result = util.Command("testFile").add_arguments(os.path.join(download_local_dest_dir, dir_name)).add_arguments(165            download_azure_src_dir).add_flags("is-object-dir", "true").execute_azcopy_verify()166        self.assertTrue(result)167168    def test_6_1kb_file_in_dir_upload_download_share(self):169        self.util_test_n_1kb_file_in_dir_upload_download_share(6)170171    # util_test_n_1kb_file_in_dir_upload_download_azure_directory verifies the upload of n 1kb file to the share.172    def util_test_n_1kb_file_in_dir_upload_download_azure_directory(self, number_of_files, recursive):173        # create dir dir_n_files and 1 kb files inside the dir.174        dir_name = "util_test_n_1kb_file_in_dir_upload_download_azure_directory_" + recursive + "_" + str(175            number_of_files) + "_files"176        sub_dir_name = "dir_subdir_" + str(number_of_files) + "_files"177178        # create n test files in dir179        src_dir = util.create_test_n_files(1024, number_of_files, dir_name)180181        # create n test files in subdir, subdir is contained in dir182        util.create_test_n_files(1024, number_of_files, os.path.join(dir_name, sub_dir_name))183184        # prepare destination directory.185        # TODO: note azcopy v2 currently only support existing directory and share.186        dest_azure_dir_name = "dest azure_dir_name"187        dest_azure_dir = util.get_resource_sas_from_share(dest_azure_dir_name)188189        result = util.Command("create").add_arguments(dest_azure_dir).add_flags("serviceType", "File"). \190            add_flags("resourceType", "Bucket").execute_azcopy_create()191        self.assertTrue(result)192193        # execute azcopy command194        result = util.Command("copy").add_arguments(src_dir).add_arguments(dest_azure_dir). \195            add_flags("recursive", recursive).add_flags("log-level", "debug").execute_azcopy_copy_command()196        self.assertTrue(result)197        198        # execute the validator.199        dest_azure_dir_to_compare = util.get_resource_sas_from_share(dest_azure_dir_name + "/" + dir_name)200        result = util.Command("testFile").add_arguments(src_dir).add_arguments(dest_azure_dir_to_compare). \201            add_flags("is-object-dir", "true").add_flags("is-recursive", recursive).execute_azcopy_verify()202        self.assertTrue(result)203204        download_azure_src_dir = dest_azure_dir_to_compare205        download_local_dest_dir = src_dir + "_download"206207        try:208            if os.path.exists(download_local_dest_dir) and os.path.isdir(download_local_dest_dir):209                shutil.rmtree(download_local_dest_dir)210        except:211            print("catch error for removing " + download_local_dest_dir)212        finally:213            os.makedirs(download_local_dest_dir)214215        # downloading the directory created from azure file directory through azcopy with recursive flag to true.216        result = util.Command("copy").add_arguments(download_azure_src_dir).add_arguments(217            download_local_dest_dir).add_flags("log-level", "debug"). \218            add_flags("recursive", recursive).execute_azcopy_copy_command()219        self.assertTrue(result)220221        # verify downloaded file.222        # todo: ensure the comparing here223        result = util.Command("testFile").add_arguments(os.path.join(download_local_dest_dir, dir_name)).add_arguments(224            download_azure_src_dir). \225            add_flags("is-object-dir", "true").add_flags("is-recursive", recursive).execute_azcopy_verify()226        self.assertTrue(result)227228    def test_3_1kb_file_in_dir_upload_download_azure_directory_recursive(self):229        self.util_test_n_1kb_file_in_dir_upload_download_azure_directory(3, "true")230231    @unittest.skip("upload directory without --recursive specified is not supported currently.")232    def test_8_1kb_file_in_dir_upload_download_azure_directory_non_recursive(self):233        self.util_test_n_1kb_file_in_dir_upload_download_azure_directory(8, "false")234235    # test_download_preserve_last_modified_time verifies the azcopy downloaded file236    # and its modified time preserved locally on disk237    def test_download_preserve_last_modified_time(self):238        # create a file of 2KB239        filename = "test_upload_preserve_last_mtime.txt"240        file_path = util.create_test_file(filename, 2048)241242        # upload file through azcopy.243        destination_sas = util.get_resource_sas_from_share(filename)244        result = util.Command("copy").add_arguments(file_path).add_arguments(destination_sas). \245            add_flags("log-level", "debug").add_flags("recursive", "true").execute_azcopy_copy_command()246        self.assertTrue(result)247248        # Verifying the uploaded file249        result = util.Command("testFile").add_arguments(file_path).add_arguments(destination_sas).execute_azcopy_verify()250        self.assertTrue(result)251252        time.sleep(5)253254        # download file through azcopy with flag preserve-last-modified-time set to true255        download_file_name = util.test_directory_path + "/test_download_preserve_last_mtime.txt"256        result = util.Command("copy").add_arguments(destination_sas).add_arguments(download_file_name).add_flags("log-level",257                                                                                                                 "debug").add_flags(258            "preserve-last-modified-time", "true").execute_azcopy_copy_command()259        self.assertTrue(result)260261        # Verifying the downloaded file and its modified with the modified time of file.262        result = util.Command("testFile").add_arguments(download_file_name).add_arguments(destination_sas).add_flags(263            "preserve-last-modified-time", "true").execute_azcopy_verify()264        self.assertTrue(result)265266    # test_file_download_63mb_in_4mb downloads 63mb file in block of 4mb through azcopy267    def test_file_download_63mb_in_4mb(self):268        # create file of 63mb269        file_name = "test_63mb_in4mb_upload.txt"270        file_path = util.create_test_file(file_name, 63 * 1024 * 1024)271272        # uploading file through azcopy with flag block-size set to 4mb273        destination_sas = util.get_resource_sas_from_share(file_name)274        result = util.Command("copy").add_arguments(file_path).add_arguments(destination_sas).add_flags("log-level",275                                                                                                        "info").add_flags(276            "block-size-mb", "4").execute_azcopy_copy_command()277        self.assertTrue(result)278279        # verify the uploaded file.280        result = util.Command("testFile").add_arguments(file_path).add_arguments(destination_sas).execute_azcopy_verify()281        self.assertTrue(result)282283        # downloading the created parallely in blocks of 4mb file through azcopy.284        download_file = util.test_directory_path + "/test_63mb_in4mb_download.txt"285        result = util.Command("copy").add_arguments(destination_sas).add_arguments(download_file).add_flags("log-level",286                                                                                                            "info").add_flags(287            "block-size-mb", "4").execute_azcopy_copy_command()288        self.assertTrue(result)289290        # verify the downloaded file291        result = util.Command("testFile").add_arguments(download_file).add_arguments(292            destination_sas).execute_azcopy_verify()293        self.assertTrue(result)294295    # test_recursive_download_file downloads a directory recursively from share through azcopy296    def test_recursive_download_file(self):297        # create directory and 5 files of 1KB inside that directory.298        dir_name = "dir_" + str(10) + "_files"299        dir1_path = util.create_test_n_files(1024, 5, dir_name)300301        # upload the directory to share through azcopy with recursive set to true.302        result = util.Command("copy").add_arguments(dir1_path).add_arguments(util.test_share_url).add_flags("log-level",303                                                                                                            "info").add_flags(304            "recursive", "true").execute_azcopy_copy_command()305        self.assertTrue(result)306307        # verify the uploaded file.308        destination_sas = util.get_resource_sas_from_share(dir_name)309        result = util.Command("testFile").add_arguments(dir1_path).add_arguments(destination_sas).add_flags("is-object-dir",310                                                                                                            "true").execute_azcopy_verify()311        self.assertTrue(result)312313        try:314            shutil.rmtree(dir1_path)315        except OSError as e:316            self.fail("error removing the uploaded files. " +  str(e))317318        # downloading the directory created from share through azcopy with recursive flag to true.319        result = util.Command("copy").add_arguments(destination_sas).add_arguments(util.test_directory_path).add_flags(320            "log-level", "info").add_flags("recursive", "true").execute_azcopy_copy_command()321        self.assertTrue(result)322323        # verify downloaded file.324        result = util.Command("testFile").add_arguments(dir1_path).add_arguments(destination_sas).add_flags("is-object-dir",325                                                                                                            "true").execute_azcopy_verify()
...dag.py
Source:dag.py  
1from hierarchies.tree import Node2from jobmon.client.swarm.workflow.python_task import PythonTask3from cascade_ode.demographics import Demographics4from cascade_ode import sge5from cascade_ode.sge import queue_from_runtime6from cascade_ode.settings import load as load_settings7from cascade_ode import drill8settings = load_settings()9class DagNode:10    """11    A node in the cascade job graph. Can be either global, upload, or12    location/year/sex specific node.13    Attributes:14        job_name (str): The SGE job name15        args (List[Union[int, str]]): List of input arguments to job16        upstream_jobs (List[str]): List of job_names of other jobs that must17            finish first before this job can run18            (can be empty list, for global jobs)19        task (List[jobmon.client.swarm.workflow.python_task]): Initially20            empty list. When driver class creates PythonTasks for each21            DagNode in the job dag, this task list is populated with the22            task (this is used to build graph since PythonTask instantiation23            involves passing other PythonTask objects to specify upstream24            dependencies)25        details (dict[str, Any]): A dict that contains any information specific26            to a particular type of node. Currently, it only contains27            information for location specific nodes28        max_attempts (int): How many retries Jobmon should do.29        queue (str): all.q or long.q, which machines should run the job.30        host_type (str|None): Whether to restrict to Intel computers.31        j_resource (bool): Whether this needs a particular mounted drive.32    """33    def __init__(self, job_name, args, upstream_jobs, details):34        self.job_name = job_name35        self.args = args36        self.upstream_jobs = upstream_jobs37        self.task = list()38        self.details = details39        # This is a place to set defaults for all nodes.40        self.max_attempts = 341        self.host_type = 'intel'42    def __eq__(self, other):43        return self.__dict__ == other.__dict__44    def __hash__(self):45        return hash(self.__dict__)46def add_if_set(host_type):47    """The logic around this flag in qsub is strange. If it is true, then48    you get that resource. If they are false, you are excluded from49    nodes that have that resource. We want to not include the flag50    at all if they are false."""51    if host_type is not None:52        return {'sge_add_args': f'-l {host_type}'}53    else:54        return None55class GlobalDagNode(DagNode):56    '''57    A global cascade job that calls run_global.py58    '''59    name_template = 'dm_{mvid}_G{cv_iter}'60    script = f"{drill.this_path}/run_global.py"61    def add_job(self, wf, jobdag, mvm):62        """63        Create a PythonTask, add it to the workflow, and update the DagNode to64        contain a reference to the task.65        Args:66            wf (jobmon.client.swarm.workflow.workflow): jobmon workflow67            job_dag (dict[str, DagNode]): a mapping of job name to DagNode68            mvm (cascade_ode.importer.Importer.model_version_meta): a dataframe69                of model settings70        """71        # we ignore jobdag since we have no upstream dependencies72        slots, memory, runtime = sge.cluster_limits(73            'global', mvm, details=self.details)74        environment_variables_to_add = settings['env_variables']75        environment_variables_to_add['OMP_NUM_THREADS'] = slots76        memory, runtime = sge.update_ME_specific_allocations(77            modelable_entity_id=mvm.modelable_entity_id.unique()[0],78            memory=memory,79            runtime=runtime)80        task = PythonTask(81            script=self.script,82            args=self.args,83            name=self.job_name,84            upstream_tasks=list(),85            env_variables=environment_variables_to_add,86            num_cores=slots,87            max_runtime_seconds=runtime,88            queue=queue_from_runtime(runtime),89            m_mem_free=f"{memory}G",90            max_attempts=self.max_attempts,91            tag='global',92            j_resource=False)93        task.context_args = add_if_set(self.host_type)94        self.task.append(task)95        wf.add_task(task)96class ChildDagNode(DagNode):97    '''98    A sex/location/year specific cascade job that calls run_children.py99    '''100    name_template = 'dm_{mvid}_{loc_id}_{sex}_{year}_{cv_iter}'101    script = f"{drill.this_path}/run_children.py"102    def add_job(self, wf, job_dag, mvm):103        """104        Create a PythonTask, add it to the workflow, and update the DagNode to105        contain a reference to the task.106        Args:107            wf (jobmon.client.swarm.workflow.workflow): jobmon workflow108            job_dag (dict[str, DagNode]): a mapping of job name to DagNode109            mvm (cascade_ode.importer.Importer.model_version_meta): a dataframe110                of model settings111        """112        num_children = self.details['num_children']113        slots, memory, runtime = sge.cluster_limits(114            'node',115            mvm,116            num_children,117            details=self.details)118        environment_variables_to_add = settings['env_variables']119        environment_variables_to_add['OMP_NUM_THREADS'] = slots120        memory, runtime = sge.update_ME_specific_allocations(121            modelable_entity_id=mvm.modelable_entity_id.unique()[0],122            memory=memory,123            runtime=runtime)124        upstream_tasks = []125        for upstream_jobname in self.upstream_jobs:126            upstream_tasks.extend(job_dag[upstream_jobname].task)127        task = PythonTask(128            script=self.script,129            args=self.args,130            name=self.job_name,131            upstream_tasks=upstream_tasks,132            env_variables=environment_variables_to_add,133            num_cores=slots,134            max_runtime_seconds=runtime,135            queue=queue_from_runtime(runtime),136            m_mem_free=f"{memory}G",137            max_attempts=self.max_attempts,138            tag='node',139            j_resource=False)140        task.context_args = add_if_set(self.host_type)141        self.task.append(task)142        wf.add_task(task)143class UploadDagNode(DagNode):144    '''145    A single cascade job for uploading that calls varnish.py146    '''147    name_template = 'dm_{mvid}_varnish'148    script = f"{drill.this_path}/varnish.py"149    def add_job(self, wf, job_dag, mvm):150        """151        Create a PythonTask, add it to the workflow, and update the DagNode to152        contain a reference to the task.153        Args:154            wf (jobmon.client.swarm.workflow.workflow): jobmon workflow155            job_dag (dict[str, DagNode]): a mapping of job name to DagNode156            mvm (cascade_ode.importer.Importer.model_version_meta): a dataframe157                of model settings158        The varnish job does the following:159           1. Uploads fits160           2. Uploads adjusted data161           3. Computes fit statistics162           4. Uploads fit statistics163           5. Attempts to generate diagnostic plots164           5. Computes and uploads finals (save-results)165           6. Updates the status of the model to finished166        """167        slots, memory, runtime = sge.cluster_limits(168            'varnish', mvm, details=self.details)169        environment_variables_to_add = settings['env_variables']170        environment_variables_to_add['OMP_NUM_THREADS'] = slots171        # Varnish memory allocation shouldn't change with job size, and172        # large memory allocations will prohibit scheduling.173        _, runtime = sge.update_ME_specific_allocations(174            modelable_entity_id=mvm.modelable_entity_id.unique()[0],175            memory=memory,176            runtime=runtime)177        upstream_tasks = []178        for upstream_jobname in self.upstream_jobs:179            upstream_tasks.extend(job_dag[upstream_jobname].task)180        task = PythonTask(181            script=self.script,182            args=self.args,183            name=self.job_name,184            upstream_tasks=upstream_tasks,185            env_variables=environment_variables_to_add,186            num_cores=slots,187            max_runtime_seconds=runtime,188            queue=queue_from_runtime(runtime),189            m_mem_free=f"{memory}G",190            max_attempts=self.max_attempts,191            tag='upload',192            j_resource=True)193        task.context_args = add_if_set(self.host_type)194        self.task.append(task)195        wf.add_task(task)196def make_dag(mvid, loctree, cv_iter, add_arguments=None):197    """198    Build a dict that represents the cascade job graph. The dict is199    a mapping of job-name -> DagNode200    Args:201        mvid (int): model version id (used in job-name creation)202        loctree (hierarchies.tree.Tree): location hierarchy to build dag203            from204        cv_iter (int): cross validation iteration.205        add_arguments (List[str]): Arguments to add to every executable of206            the Cascade.207    Returns:208        dict[str, DagNode]209    """210    add_arguments = add_arguments if add_arguments else list()211    dag = {}212    demo = Demographics(mvid)213    global_jobs = make_global_jobs(mvid, cv_iter, add_arguments=add_arguments)214    global_job_names = list(global_jobs.keys())215    node_jobs = make_child_jobs(mvid, demo, cv_iter, global_job_names, loctree,216                                add_arguments=add_arguments217                                )218    node_job_names = list(node_jobs.keys())219    all_job_names = global_job_names + node_job_names220    varnish_job = make_upload_job(221        mvid, all_job_names, add_arguments=add_arguments)222    # this ordering is important -- when we build the jobmon workflow we223    # iterate through this dict and build PythonTasks. For every node in the224    # graph we assume all upstream tasks have already been visited and225    # their DagNodes have PythonTasks associated in DagNode.task226    all_dicts = [global_jobs, node_jobs, varnish_job]227    for d in all_dicts:228        dag.update(d)229    return dag230def make_global_jobs(mvid, cv_iter, add_arguments=None):231    """232    Returns a dict of job-name -> GlobalDagNode that represents all global233    jobs.234    If cv_iter is None, then this is just a dict of length 1. If cv_iter is235    an integer, the dict will be of length cv_iter.236    Args:237        mvid (int): model version id238        cv_iter (Opt[int]): Optional integer representing number of cross239            validation jobs240        add_arguments (List[str]): Arguments to add to every executable of241            the Cascade.242    Returns:243        dict[str, GlobalDagNode]244    """245    add_arguments = add_arguments if add_arguments else list()246    jobs = {}247    if cv_iter is None:248        cv_iter = [0]249    for i in cv_iter:250        job_name = GlobalDagNode.name_template.format(mvid=mvid, cv_iter=i)251        args = [mvid, '--cv_iter', i] + add_arguments252        upstream_jobs = []253        details = {}254        jobs[job_name] = GlobalDagNode(255            job_name, args, upstream_jobs, details)256    return jobs257def make_child_jobs(mvid, demo, cv_iter, global_job_names, lt,258                    add_arguments=None):259    """260    Returns a dict of job-name -> ChildDagNode that represents all261    run_children.py jobs.262    Args:263        mvid (int): model version id264        demo (cascade_ode.demographics.Demographics): demographic info265        cv_iter (Opt[int]): Optional integer representing number of cross266            validation jobs267        global_job_names (List[str]): We need list of global job names268            to associate upstream tasks of initial node jobs269        lt (hierarchies.tree.Tree): location hierarchy270        add_arguments (List[str]): Arguments to add to all jobs.271    Returns:272        dict[str, ChildDagNode]273    """274    add_arguments = add_arguments if add_arguments else list()275    jobs = {}276    if cv_iter is None:277        cv_iter = [0]278    sex_dict = {1: 'male', 2: 'female'}279    root_node = lt.root280    leaf_locs = Node.s_leaves(root_node)281    nodes_to_run = [l for l in lt.nodes if l not in leaf_locs]282    for i in cv_iter:283        global_job = [j for j in global_job_names if j.endswith("G" + str(i))]284        assert len(global_job) == 1285        for sex_id in demo.sex_ids:286            sex = sex_dict[sex_id]287            sex_short = sex[0]288            for year_id in demo.year_ids:289                year = str(year_id)[2:]290                for node in nodes_to_run:291                    parent_id = None if node == root_node else node.parent.id292                    if parent_id is None:293                        parent_name = global_job[0]294                    else:295                        parent_name = ChildDagNode.name_template.format(296                            mvid=mvid, loc_id=parent_id, sex=sex_short,297                            year=year, cv_iter=i)298                    this_job = ChildDagNode.name_template.format(299                        mvid=mvid, loc_id=node.id, sex=sex_short, year=year,300                        cv_iter=i)301                    args = [mvid, node.id, sex, year_id, i] + add_arguments302                    parent_jobs = [parent_name]303                    # we need to include num-children and demographic info304                    # here because we need305                    # that info to infer SGE memory allocation when building306                    # jobmon PythonTask307                    details = {308                        'num_children': len(node.children),309                        'location_id': node.id,310                        'sex': sex_short,311                        'year': str(year)312                    }313                    jobs[this_job] = ChildDagNode(314                        this_job, args, parent_jobs, details)315    return jobs316def make_upload_job(mvid, all_job_names, add_arguments=None):317    """318    Returns a dict of job-name -> UploadDagNode that represents the final319    upload job (varnish.py)320    Args:321        mvid (int): model version id322        all_job_names (List[str]): We need list of all job names323            to associate upstream tasks for varnish324        add_arguments (List[str]): Arguments to add to all jobs.325    Returns:326        dict[str, UploadDagNode]327    """328    add_arguments = add_arguments if add_arguments else list()329    job_name = UploadDagNode.name_template.format(mvid=mvid)330    args = [mvid] + add_arguments331    details = {}332    return {333        job_name: UploadDagNode(job_name, args, all_job_names, details)}334def check_error_msg_for_sigkill(error_msg):335    """336    Take an error string, return boolean if strings contain337    '<Signals.SIGKILL: 9>'338    '<Signals.SIGKILL: 9>' is a generic error code for when the scheduler kills339    a given process. We assume that when we run into this error that the dismod340    kernel has been killed by the SGE due to memory overages. This sigkill,341    however, is not propagated to the Python process, and thus jobmon does not342    allocate more resources upon this type of failure. We use this function to343    look for this so we can kill the parent python process that causes jobmon344    to bump the given task's memory.345    """...test_upload_page_blob.py
Source:test_upload_page_blob.py  
2import unittest3import os4class PageBlob_Upload_User_Scenarios(unittest.TestCase):5    def setUp(self):6        cmd = util.Command("login").add_arguments("--service-principal").add_flags("application-id", os.environ['ACTIVE_DIRECTORY_APPLICATION_ID'])7        cmd.execute_azcopy_copy_command()8    def tearDown(self):9        cmd = util.Command("logout")10        cmd.execute_azcopy_copy_command()11    # util_test_page_blob_upload_1mb verifies the azcopy upload of 1mb file12    # as a page blob.13    def util_test_page_blob_upload_1mb(self, use_oauth=False):14        # create the test gile.15        file_name = "test_page_blob_1mb.vhd"16        file_path = util.create_test_file(file_name, 1024 * 1024)17        # execute azcopy upload.18        if not use_oauth:19            dest = util.get_resource_sas(file_name)20            dest_validate = dest21        else:22            dest = util.get_resource_from_oauth_container(file_name)23            dest_validate = util.get_resource_from_oauth_container_validate(file_name)24        result = util.Command("copy").add_arguments(file_path).add_arguments(dest).add_flags("log-level", "info"). \25            add_flags("block-size-mb", "4").add_flags("blob-type", "PageBlob").execute_azcopy_copy_command()26        self.assertTrue(result)27        # execute validator.28        result = util.Command("testBlob").add_arguments(file_path).add_arguments(dest_validate). \29            add_flags("blob-type", "PageBlob").execute_azcopy_verify()30        self.assertTrue(result)31    # test_page_blob_upload_1mb_with_sas verifies the azcopy upload of 1mb file32    # as a page blob with sas.33    def test_page_blob_upload_1mb_with_sas(self):34        self.util_test_page_blob_upload_1mb(False)35    # test_page_blob_upload_1mb_with_oauth verifies the azcopy upload of 1mb file36    # as a page blob with oauth.37    def test_page_blob_upload_1mb_with_oauth(self):38        self.util_test_page_blob_upload_1mb(True)39    # test_page_range_for_complete_sparse_file verifies the number of Page ranges for40    # complete empty file i.e each character is Null character.41    def test_page_range_for_complete_sparse_file(self):42        # step 1: uploading a sparse file should be optimized43        # create test file.44        file_name = "sparse_file.vhd"45        file_path = util.create_complete_sparse_file(file_name, 16 * 1024 * 1024)46        # execute azcopy page blob upload.47        upload_destination_sas = util.get_resource_sas(file_name)48        result = util.Command("copy").add_arguments(file_path).add_arguments(upload_destination_sas).add_flags(49            "log-level", "info"). \50            add_flags("block-size-mb", "1").add_flags("blob-type", "PageBlob").execute_azcopy_copy_command()51        self.assertTrue(result)52        # execute validator.53        # no of page ranges should be 0 for the empty sparse file.54        result = util.Command("testBlob").add_arguments(file_path).add_arguments(upload_destination_sas). \55            add_flags("blob-type", "PageBlob").add_flags("verify-block-size", "true"). \56            add_flags("number-blocks-or-pages", "0").execute_azcopy_verify()57        self.assertTrue(result)58        # step 2: copy the blob to a second blob should also be optimized59        copy_destination_sas = util.get_resource_sas('sparse_file2.vhd')60        # execute copy61        result = util.Command("copy").add_arguments(upload_destination_sas).add_arguments(copy_destination_sas) \62            .add_flags("log-level", "info").add_flags("block-size-mb", "1").execute_azcopy_copy_command()63        self.assertTrue(result)64        # execute validator.65        # no of page ranges should be 0 for the empty sparse file.66        result = util.Command("testBlob").add_arguments(file_path).add_arguments(copy_destination_sas). \67            add_flags("blob-type", "PageBlob").add_flags("verify-block-size", "true"). \68            add_flags("number-blocks-or-pages", "0").execute_azcopy_verify()69        self.assertTrue(result)70        download_dest = util.test_directory_path + "/sparse_file_downloaded.vhd"71        result = util.Command("copy").add_arguments(copy_destination_sas).add_arguments(download_dest).add_flags(72            "log-level", "info").add_flags("block-size-mb", "1").execute_azcopy_copy_command()73        self.assertTrue(result)74        # Verifying the downloaded blob75        result = util.Command("testBlob").add_arguments(download_dest).add_arguments(76            copy_destination_sas).add_flags("blob-type", "PageBlob").execute_azcopy_verify()77        self.assertTrue(result)78    # test_page_blob_upload_partial_sparse_file verifies the number of page ranges79    # for PageBlob upload by azcopy.80    def test_page_blob_upload_partial_sparse_file(self):81        # step 1: uploading a sparse file should be optimized82        # create test file.83        file_name = "test_partial_sparse_file.vhd"84        file_path = util.create_partial_sparse_file(file_name, 16 * 1024 * 1024)85        # execute azcopy pageblob upload.86        upload_destination_sas = util.get_resource_sas(file_name)87        result = util.Command("copy").add_arguments(file_path).add_arguments(upload_destination_sas).add_flags(88            "log-level", "info"). \89            add_flags("block-size-mb", "4").add_flags("blob-type", "PageBlob").execute_azcopy_copy_command()90        self.assertTrue(result)91        # number of page range for partial sparse created above will be (size/2)92        number_of_page_ranges = int((16 * 1024 * 1024 / (4 * 1024 * 1024)) / 2)93        # execute validator to verify the number of page range for uploaded blob.94        result = util.Command("testBlob").add_arguments(file_path).add_arguments(upload_destination_sas). \95            add_flags("blob-type", "PageBlob").add_flags("verify-block-size", "true"). \96            add_flags("number-blocks-or-pages", str(number_of_page_ranges)).execute_azcopy_verify()97        self.assertTrue(result)98        # step 2: copy the blob to a second blob should also be optimized99        copy_destination_sas = util.get_resource_sas('sparse_file2.vhd')100        # execute copy101        result = util.Command("copy").add_arguments(upload_destination_sas).add_arguments(copy_destination_sas) \102            .add_flags("log-level", "info").add_flags("block-size-mb", "4").execute_azcopy_copy_command()103        self.assertTrue(result)104        # execute validator to verify the number of page range for uploaded blob.105        result = util.Command("testBlob").add_arguments(file_path).add_arguments(copy_destination_sas). \106            add_flags("blob-type", "PageBlob").add_flags("verify-block-size", "true"). \107            add_flags("number-blocks-or-pages", str(number_of_page_ranges)).execute_azcopy_verify()108        self.assertTrue(result)109        download_dest = util.test_directory_path + "/partial_sparse_file_downloaded.vhd"110        result = util.Command("copy").add_arguments(copy_destination_sas).add_arguments(download_dest).add_flags(111            "log-level", "info").add_flags("block-size-mb", "1").execute_azcopy_copy_command()112        self.assertTrue(result)113        # Verifying the downloaded blob114        result = util.Command("testBlob").add_arguments(download_dest).add_arguments(copy_destination_sas)\115            .add_flags("blob-type", "PageBlob").execute_azcopy_verify()116        self.assertTrue(result)117    def test_set_page_blob_tier(self):118        # test for P10 Page Blob Access Tier119        filename = "test_page_P10_blob_tier.vhd"120        file_path = util.create_test_file(filename, 100 * 1024)121        destination_sas = util.get_resource_sas_from_premium_container_sas(filename)122        result = util.Command("copy").add_arguments(file_path).add_arguments(destination_sas). \123            add_flags("log-level", "info").add_flags("blob-type", "PageBlob").add_flags("page-blob-tier",124                                                                                        "P10").execute_azcopy_copy_command()125        self.assertTrue(result)126        # execute azcopy validate order.127        # added the expected blob-tier "P10"128        result = util.Command("testBlob").add_arguments(file_path).add_arguments(destination_sas). \129            add_flags("blob-type", "PageBlob").add_flags("blob-tier", "P10").execute_azcopy_verify()130        self.assertTrue(result)131        # test for P20 Page Blob Access Tier132        filename = "test_page_P20_blob_tier.vhd"133        file_path = util.create_test_file(filename, 100 * 1024)134        destination_sas = util.get_resource_sas_from_premium_container_sas(filename)135        result = util.Command("copy").add_arguments(file_path).add_arguments(destination_sas). \136            add_flags("log-level", "info").add_flags("blob-type", "PageBlob").add_flags("page-blob-tier",137                                                                                        "P20").execute_azcopy_copy_command()138        self.assertTrue(result)139        # execute azcopy validate order.140        # added the expected blob-tier "P20"141        result = util.Command("testBlob").add_arguments(file_path).add_arguments(destination_sas). \142            add_flags("blob-type", "PageBlob").add_flags("blob-tier", "P20").execute_azcopy_verify()143        self.assertTrue(result)144        # test for P30 Page Blob Access Tier145        filename = "test_page_P30_blob_tier.vhd"146        file_path = util.create_test_file(filename, 100 * 1024)147        destination_sas = util.get_resource_sas_from_premium_container_sas(filename)148        result = util.Command("copy").add_arguments(file_path).add_arguments(destination_sas). \149            add_flags("log-level", "info").add_flags("blob-type", "PageBlob").add_flags("page-blob-tier",150                                                                                        "P30").execute_azcopy_copy_command()151        self.assertTrue(result)152        # execute azcopy validate order.153        # added the expected blob-tier "P30"154        result = util.Command("testBlob").add_arguments(file_path).add_arguments(destination_sas). \155            add_flags("blob-type", "PageBlob").add_flags("blob-tier", "P30").execute_azcopy_verify()156        self.assertTrue(result)157        # test for P4 Page Blob Access Tier158        filename = "test_page_P4_blob_tier.vhd"159        file_path = util.create_test_file(filename, 100 * 1024)160        destination_sas = util.get_resource_sas_from_premium_container_sas(filename)161        result = util.Command("copy").add_arguments(file_path).add_arguments(destination_sas). \162            add_flags("log-level", "info").add_flags("blob-type", "PageBlob").add_flags("page-blob-tier",163                                                                                        "P4").execute_azcopy_copy_command()164        self.assertTrue(result)165        # execute azcopy validate order.166        # added the expected blob-tier "P4"167        result = util.Command("testBlob").add_arguments(file_path).add_arguments(destination_sas). \168            add_flags("blob-type", "PageBlob").add_flags("blob-tier", "P4").execute_azcopy_verify()169        self.assertTrue(result)170        # test for P40 Page Blob Access Tier171        filename = "test_page_P40_blob_tier.vhd"172        file_path = util.create_test_file(filename, 100 * 1024)173        destination_sas = util.get_resource_sas_from_premium_container_sas(filename)174        result = util.Command("copy").add_arguments(file_path).add_arguments(destination_sas). \175            add_flags("log-level", "info").add_flags("blob-type", "PageBlob").add_flags("page-blob-tier",176                                                                                        "P40").execute_azcopy_copy_command()177        self.assertTrue(result)178        # execute azcopy validate order.179        # added the expected blob-tier "P40"180        result = util.Command("testBlob").add_arguments(file_path).add_arguments(destination_sas). \181            add_flags("blob-type", "PageBlob").add_flags("blob-tier", "P40").execute_azcopy_verify()182        self.assertTrue(result)183        # test for P50 Page Blob Access Tier184        filename = "test_page_P50_blob_tier.vhd"185        file_path = util.create_test_file(filename, 100 * 1024)186        destination_sas = util.get_resource_sas_from_premium_container_sas(filename)187        result = util.Command("copy").add_arguments(file_path).add_arguments(destination_sas). \188            add_flags("log-level", "info").add_flags("blob-type", "PageBlob").add_flags("page-blob-tier",189                                                                                        "P50").execute_azcopy_copy_command()190        self.assertTrue(result)191        # execute azcopy validate order.192        # added the expected blob-tier "P50"193        result = util.Command("testBlob").add_arguments(file_path).add_arguments(destination_sas). \194            add_flags("blob-type", "PageBlob").add_flags("blob-tier", "P50").execute_azcopy_verify()195        self.assertTrue(result)196        # test for P6 Page Blob Access Tier197        filename = "test_page_P6_blob_tier.vhd"198        file_path = util.create_test_file(filename, 100 * 1024)199        destination_sas = util.get_resource_sas_from_premium_container_sas(filename)200        result = util.Command("copy").add_arguments(file_path).add_arguments(destination_sas). \201            add_flags("log-level", "info").add_flags("blob-type", "PageBlob").add_flags("page-blob-tier",202                                                                                        "P6").execute_azcopy_copy_command()203        self.assertTrue(result)204        # execute azcopy validate order.205        # added the expected blob-tier "P50"206        result = util.Command("testBlob").add_arguments(file_path).add_arguments(destination_sas). \207            add_flags("blob-type", "PageBlob").add_flags("blob-tier", "P6").execute_azcopy_verify()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
