Best Python code snippet using localstack_python
parameter_server_optimizer.py
Source:parameter_server_optimizer.py  
1#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7#     http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13from paddle import fluid14from .meta_optimizer_base import MetaOptimizerBase15from paddle.fluid import core16import subprocess17import re18import os19import platform20from ..base.private_helper_function import wait_server_ready21class ParameterServerOptimizer(MetaOptimizerBase):22    def __init__(self, optimizer):23        super(ParameterServerOptimizer, self).__init__(optimizer)24        self.inner_opt = optimizer25        # we do not allow meta optimizer to be inner optimizer currently26        self.meta_optimizers_white_list = []27    def _is_graph_out(self):28        return False29    def _can_apply(self):30        if self.role_maker._is_collective:31            return False32        k_steps = self.user_defined_strategy.a_sync_configs["k_steps"]33        return True if k_steps >= 0 else False34    def _get_distributed_strategy(self):35        from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory36        k_steps = self.user_defined_strategy.a_sync_configs["k_steps"]37        strategy = None38        if not self.user_defined_strategy.a_sync and k_steps == 0:39            strategy = StrategyFactory.create_sync_strategy()40        if self.user_defined_strategy.a_sync and k_steps == 0:41            strategy = StrategyFactory.create_async_strategy()42        if self.user_defined_strategy.a_sync and k_steps > 0:43            strategy = StrategyFactory.create_geo_strategy(k_steps)44        if not strategy:45            raise ValueError("k_steps must be invalid value, please check")46        return strategy47    def _build_trainer_programs(self, compiled_config):48        from paddle.fluid.incubate.fleet.parameter_server.ir import trainer_pass as worker49        _main = compiled_config.origin_main_program.clone()50        _startup = compiled_config.origin_startup_program.clone()51        if not compiled_config.is_geo_mode():52            # for main program53            _main = worker.delete_optimizer_pass(_main, compiled_config)54            _main = worker.distributed_ops_pass(_main, compiled_config)55            _main = worker.append_send_ops_pass(_main, compiled_config)56            # for startup program57            _startup = worker.fake_init_ops_pass(_startup, compiled_config)58            _startup = worker.init_from_server_pass(_startup, compiled_config)59            _startup = worker.delet_extra_optimizes_pass(_startup,60                                                         compiled_config)61            compiled_config.set_origin_ps_main_program(_main)62            compiled_config.set_origin_ps_startup_program(_startup)63            # for heter program64            if self.role_maker._is_heter_parameter_server_mode:65                from paddle.fluid.incubate.fleet.parameter_server.ir import heter_trainer_pass as heter_worker66                if self.role_maker._is_heter_worker():67                    # for heter worker68                    _main = heter_worker.split_heter_worker_ops_pass(69                        _main, compiled_config)70                else:71                    # for default worker72                    _main = heter_worker.split_trainer_ops_pass(_main,73                                                                compiled_config)74                # for startup change75                _startup = heter_worker.delete_startup_useless_ops_var_pass(76                    _startup, _main, compiled_config)77        else:78            _main = worker.append_send_ops_pass(_main, compiled_config)79            _startup = _startup80            compiled_config.set_origin_ps_main_program(_main)81            compiled_config.set_origin_ps_startup_program(_startup)82        launch_barrier = self.user_defined_strategy.a_sync_configs[83            "launch_barrier"]84        launch_barrier_flag = int(os.getenv("FLAGS_LAUNCH_BARRIER", "1"))85        if launch_barrier and launch_barrier_flag:86            # for trainer wait server ready87            wait_server_ready(self.role_maker._get_pserver_endpoints())88            # for ps-heter mode, wait heter worker ready89            if self.role_maker._is_heter_parameter_server_mode and self.role_maker._is_worker(90            ):91                wait_server_ready(self.role_maker._get_heter_worker_endpoints())92        return _main, _startup93    def _build_pserver_programs(self, compiled_config):94        from paddle.fluid.incubate.fleet.parameter_server.ir import pserver_pass as server95        _main = fluid.Program()96        _startup = fluid.Program()97        if not compiled_config.is_geo_mode():98            _main = server.add_listen_and_serv_pass(_main, compiled_config)99            _main = server.add_rpc_global_flags_pass(_main, compiled_config)100            _main = server.add_optimizer_pass(_main, compiled_config)101            _main = server.large_scale_sparse_pass(_main, _main,102                                                   compiled_config, False)103            _startup = server.build_pserver_startup_program_pass(104                _startup, _main, compiled_config)105            _startup = server.large_scale_sparse_pass(_startup, _main,106                                                      compiled_config, True)107            if not compiled_config.is_sync_mode():108                _main = server.delete_unused_in_main_pass(_main,109                                                          compiled_config)110            _startup = server.delete_unused_in_startup_pass(_startup, _main,111                                                            compiled_config)112        else:113            _main = server.add_listen_and_serv_pass(_main, compiled_config)114            _main = server.add_rpc_global_flags_pass(_main, compiled_config)115            _main = server.add_geo_optimizer_pass(_main, compiled_config)116            _main = server.large_scale_sparse_pass(_main, _main,117                                                   compiled_config, False)118            _startup = server.build_pserver_startup_program_pass(119                _startup, _main, compiled_config)120            _startup = server.large_scale_sparse_pass(_startup, _main,121                                                      compiled_config, True)122            _startup = server.delete_unused_in_startup_pass(_startup, _main,123                                                            compiled_config)124        return _main, _startup125    def _can_apply_geo(self, dist_strategy, program):126        def get_sys_free_mem():127            plat = platform.system()128            if platform.system() == "Darwin":129                vm = subprocess.Popen(130                    ['vm_stat'], stdout=subprocess.PIPE).communicate()[0]131                # Process vm_stat132                vmLines = vm.split('\n')133                sep = re.compile(r':[\s]+')134                vmStats = {}135                for row in range(1, len(vmLines) - 2):136                    rowText = vmLines[row].strip()137                    rowElements = sep.split(rowText)138                    vmStats[(rowElements[0]139                             )] = int(rowElements[1].strip(r'\.')) * 4096140                return vmStats["Pages free"]141            elif platform.system() == "Linux":142                mems = {}143                with open('/proc/meminfo', 'rb') as f:144                    for line in f:145                        fields = line.split()146                        mems[fields[0]] = int(fields[1]) * 1024147                free = mems[b'MemFree:']148                return free149            else:150                raise ValueError(151                    "%s platform is unsupported is parameter server optimizer" %152                    (platform.system()))153        if not isinstance(self.inner_opt, fluid.optimizer.SGDOptimizer):154            return False155        free = get_sys_free_mem()156        from paddle.fluid.incubate.fleet.parameter_server.ir import vars_metatools157        processed_var_names = set(["@EMPTY@"])158        param_memory_size = 0159        for varname in program.global_block().vars:160            var = program.global_block().vars[varname]161            if not var.persistable or var.desc.type(162            ) != core.VarDesc.VarType.LOD_TENSOR:163                continue164            param = vars_metatools.create_var_struct(var)165            param_memory_size += param.m_size166            processed_var_names.add(varname)167        upper_mem_use = param_memory_size * 5.0168        program_tmp_vars = dict()169        eval_batch_size = 1024170        for op in program.global_block().ops:171            for var_name in op.output_arg_names:172                if var_name in processed_var_names:173                    continue174                processed_var_names.add(var_name)175                var = program.global_block().vars[var_name]176                if var.desc.type() != core.VarDesc.VarType.LOD_TENSOR:177                    continue178                data_count = 1179                neg_dim_count = 0180                for x in var.shape:181                    if x < 0:182                        if neg_dim_count >= 1:183                            raise ValueError(184                                "Var %s has more than one negative dim." %185                                (var_name))186                        neg_dim_count += 1187                        data_count *= (-x)188                    else:189                        data_count *= x190                program_tmp_vars[var_name] = (191                    data_count, neg_dim_count,192                    vars_metatools.dtype_to_size[var.dtype])193        for varname in program_tmp_vars:194            data_count, neg_dim_count, type_size = program_tmp_vars[varname]195            if neg_dim_count == 1:196                data_count *= eval_batch_size197            var_memory = data_count * type_size198            upper_mem_use += var_memory199        if upper_mem_use < free:200            return True201        else:202            return False203    def minimize_impl(self,204                      loss,205                      startup_program=None,206                      parameter_list=None,207                      no_grad_set=None):208        self.inner_opt.minimize(loss, startup_program, parameter_list,209                                no_grad_set)210        strategy = self._get_distributed_strategy()211        _origin_main_program = loss.block.program212        _origin_startup_program = startup_program213        from paddle.fluid.incubate.fleet.parameter_server.ir import public as public214        compiled_config = public.CompileTimeStrategy(_origin_main_program,215                                                     _origin_startup_program,216                                                     strategy, self.role_maker)217        compiled_config.strategy = strategy218        if self.role_maker._is_worker() or self.role_maker._is_heter_worker():219            main_program, startup_program = self._build_trainer_programs(220                compiled_config)221        elif self.role_maker._is_server():222            main_program, startup_program = self._build_pserver_programs(223                compiled_config)224        loss.block.program = main_program225        fluid.framework.switch_startup_program(startup_program)226        return None, None227    def _disable_strategy(self, dist_strategy):228        dist_strategy.a_sync = False229        a_sync_configs = dist_strategy.a_sync_configs230        a_sync_configs["k_steps"] = -1231        dist_strategy.a_sync_configs = a_sync_configs232    def _enable_strategy(self, dist_strategy, context):233        a_sync_configs = dist_strategy.a_sync_configs234        if a_sync_configs["k_steps"] >= 0:235            return236        dist_strategy.a_sync = True237        a_sync_configs = dist_strategy.a_sync_configs238        is_geo = self._can_apply_geo(dist_strategy,239                                     context["origin_main_program"])240        if is_geo:241            a_sync_configs["k_steps"] = 800242        else:243            a_sync_configs["k_steps"] = 0...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
