How to use process_set method in tox

Best Python code snippet using tox_python

mpi_ops.py

Source:mpi_ops.py Github

copy

Full Screen

1# Copyright 2019 Uber Technologies, Inc. All Rights Reserved.2# Modifications copyright Microsoft3# Modifications copyright (C) 2020, NVIDIA CORPORATION. All rights reserved.4#5# Licensed under the Apache License, Version 2.0 (the "License");6# you may not use this file except in compliance with the License.7# You may obtain a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16# ==============================================================================17# Load all the necessary PyTorch C types.18import torch19import warnings20from horovod.common.basics import HorovodBasics as _HorovodBasics21from horovod.common.exceptions import HorovodInternalError22from horovod.common.process_sets import _setup as _setup_process_sets23from horovod.common.process_sets import ProcessSet, global_process_set, add_process_set, remove_process_set24from horovod.common.util import check_installed_version, get_average_backwards_compatibility_fun, gpu_available, num_rank_is_power_225from horovod.torch.compression import Compression26# Check possible symbol not found error from pytorch version mismatch27try:28 from horovod.torch import mpi_lib_v2 as mpi_lib29except Exception as e:30 check_installed_version('pytorch', torch.__version__, e)31 raise e32else:33 check_installed_version('pytorch', torch.__version__)34_NULL = ""35_basics = _HorovodBasics(__file__, 'mpi_lib_v2')36# import basic methods37is_initialized = _basics.is_initialized38start_timeline = _basics.start_timeline39stop_timeline = _basics.stop_timeline40size = _basics.size41local_size = _basics.local_size42cross_size = _basics.cross_size43rank = _basics.rank44local_rank = _basics.local_rank45cross_rank = _basics.cross_rank46mpi_threads_supported = _basics.mpi_threads_supported47mpi_enabled = _basics.mpi_enabled48mpi_built = _basics.mpi_built49gloo_enabled = _basics.gloo_enabled50gloo_built = _basics.gloo_built51nccl_built = _basics.nccl_built52ddl_built = _basics.ddl_built53ccl_built = _basics.ccl_built54cuda_built = _basics.cuda_built55rocm_built = _basics.rocm_built56def shutdown(*args, **kwargs):57 mpi_lib.horovod_torch_reset()58 return _basics.shutdown(*args, **kwargs)59def init(*args, **kwargs):60 global _handle_map61 _handle_map = {}62 _basics.init(*args, **kwargs)63 # Call set up again to make sure the basics is in sync64 _setup_process_sets(_basics)65# import reduction op values66Average = _basics.Average67Sum = _basics.Sum68Adasum = _basics.Adasum69is_homogeneous = _basics.is_homogeneous70handle_average_backwards_compatibility = get_average_backwards_compatibility_fun(_basics)71_setup_process_sets(_basics)72# Schema: handle -> input, output73# We keep input in order to make sure it does not get garbage collected74# before the operation is finished.75_handle_map = {}76def _check_function(function_factory, tensor):77 function = function_factory(tensor)78 if not hasattr(mpi_lib, function):79 raise ValueError('Tensor type %s is not supported.' % tensor.type())80 if not tensor.is_contiguous():81 raise ValueError('Tensor is required to be contiguous.')82 return function83def _allreduce_function_factory(tensor):84 return 'horovod_torch_allreduce_async_' + tensor.type().replace('.', '_')85def _allreduce_async(tensor, output, name, op, prescale_factor, postscale_factor, process_set: ProcessSet):86 # Set the divisor for reduced gradients to average when necessary87 if op == Average:88 if rocm_built():89 # For ROCm, perform averaging at framework level90 divisor = size()91 op = Sum92 else:93 divisor = 194 elif op == Adasum:95 if process_set != global_process_set:96 raise NotImplementedError("Adasum does not support non-global process sets yet.")97 if tensor.device.type != 'cpu' and gpu_available('torch'):98 if nccl_built():99 if not is_homogeneous():100 raise NotImplementedError('Running GPU Adasum on heterogeneous cluster is not supported yet.')101 elif not num_rank_is_power_2(int(size() / local_size())):102 raise NotImplementedError('Running GPU Adasum with non-power of 2 nodes is not supported yet.')103 if rocm_built():104 # For ROCm, perform averaging at framework level105 divisor = local_size()106 else:107 divisor = 1108 else:109 warnings.warn('Adasum reduction does not currently support GPU reduction using MPI. Tensors are '110 'copied to CPU memory instead. To use Adasum for GPU reduction, please compile Horovod '111 'with HOROVOD_GPU_OPERATIONS=NCCL.')112 divisor = 1113 else:114 if not num_rank_is_power_2(size()):115 raise NotImplementedError('Running Adasum with non-power of 2 ranks is not supported yet.')116 divisor = 1117 else:118 divisor = 1119 if tensor.dtype == torch.int8 or tensor.dtype == torch.float16:120 function = _check_function(_allgather_function_factory, tensor)121 handle = getattr(mpi_lib, function)(122 tensor, output, name.encode() if name is not None else _NULL, process_set.process_set_id)123 else:124 function = _check_function(_allreduce_function_factory, tensor)125 handle = getattr(mpi_lib, function)(tensor, output, divisor, name.encode() if name is not None else _NULL, op, prescale_factor, postscale_factor, process_set.process_set_id)126 127 _handle_map[handle] = (tensor, output)128 return handle129def allreduce_async(tensor, average=None, name=None, op=None,130 prescale_factor=1.0, postscale_factor=1.0,131 process_set=global_process_set):132 """133 A function that performs asynchronous averaging or summation of the input tensor134 over all the Horovod processes. The input tensor is not modified.135 The reduction operation is keyed by the name. If name is not provided, an incremented136 auto-generated name is used. The tensor type and shape must be the same on all137 Horovod processes for a given name. The reduction will not start until all processes138 are ready to send and receive the tensor.139 Arguments:140 tensor: A tensor to reduce.141 average:142 .. warning:: .. deprecated:: 0.19.0143 Use `op` instead. Will be removed in v0.21.0.144 name: A name of the reduction operation.145 op: The reduction operation to combine tensors across different146 ranks. Defaults to Average if None is given.147 prescale_factor: Multiplicative factor to scale tensor before allreduce.148 postscale_factor: Multiplicative factor to scale tensor after allreduce.149 process_set: Process set object to limit this operation to a subset of150 Horovod processes. Default is the global process set.151 Returns:152 A handle to the allreduce operation that can be used with `poll()` or153 `synchronize()`.154 """155 op = handle_average_backwards_compatibility(op, average)156 output = tensor.new(tensor.shape)157 return _allreduce_async(tensor, output, name, op, prescale_factor, postscale_factor, process_set)158class HorovodAllreduce(torch.autograd.Function):159 """An autograd function that performs allreduce on a tensor."""160 @staticmethod161 def forward(ctx, tensor, average, name, op, prescale_factor, postscale_factor, process_set):162 ctx.average = average163 ctx.op = op164 ctx.prescale_factor = prescale_factor165 ctx.postscale_factor = postscale_factor166 ctx.process_set = process_set167 handle = allreduce_async(tensor, average, name, op, prescale_factor, postscale_factor, process_set)168 return synchronize(handle)169 @staticmethod170 def backward(ctx, grad_output):171 return allreduce(grad_output, average=ctx.average, op=ctx.op,172 prescale_factor=ctx.prescale_factor,173 postscale_factor=ctx.postscale_factor,174 process_set=ctx.process_set), None, None, None, None, None, None175def allreduce(tensor, average=None, name=None, compression=Compression.none, op=None,176 prescale_factor=1.0, postscale_factor=1.0, process_set=global_process_set):177 """178 A function that performs averaging or summation of the input tensor over all the179 Horovod processes. The input tensor is not modified.180 The reduction operation is keyed by the name. If name is not provided, an incremented181 auto-generated name is used. The tensor type and shape must be the same on all182 Horovod processes for a given name. The reduction will not start until all processes183 are ready to send and receive the tensor.184 This acts as a thin wrapper around an autograd function. If your input185 tensor requires gradients, then callings this function will allow gradients186 to be computed and backpropagated.187 Arguments:188 tensor: A tensor to reduce.189 average:190 .. warning:: .. deprecated:: 0.19.0191 Use `op` instead. Will be removed in v0.21.0.192 name: A name of the reduction operation.193 compression: Compression algorithm used during allreduce to reduce the amount194 of data sent during the each parameter update step. Defaults to195 not using compression.196 op: The reduction operation to combine tensors across different ranks. Defaults197 to Average if None is given.198 prescale_factor: Multiplicative factor to scale tensor before allreduce.199 postscale_factor: Multiplicative factor to scale tensor after allreduce.200 process_set: Process set object to limit this operation to a subset of201 Horovod processes. Default is the global process set.202 Returns:203 A tensor of the same shape and type as `tensor`, averaged or summed across all204 processes.205 """206 tensor_compressed, ctx = compression.compress(tensor)207 summed_tensor_compressed = HorovodAllreduce.apply(tensor_compressed, average, name, op,208 prescale_factor, postscale_factor,209 process_set)210 return compression.decompress(summed_tensor_compressed, ctx)211def allreduce_async_(tensor, average=None, name=None, op=None,212 prescale_factor=1.0, postscale_factor=1.0,213 process_set=global_process_set):214 """215 A function that performs asynchronous in-place averaging or summation of the input216 tensor over all the Horovod processes.217 The reduction operation is keyed by the name. If name is not provided, an incremented218 auto-generated name is used. The tensor type and shape must be the same on all219 Horovod processes for a given name. The reduction will not start until all processes220 are ready to send and receive the tensor.221 Arguments:222 tensor: A tensor to reduce.223 average:224 .. warning:: .. deprecated:: 0.19.0225 Use `op` instead. Will be removed in v0.21.0.226 name: A name of the reduction operation.227 op: The reduction operation to combine tensors across different ranks. Defaults to228 Average if None is given.229 prescale_factor: Multiplicative factor to scale tensor before allreduce.230 postscale_factor: Multiplicative factor to scale tensor after allreduce.231 process_set: Process set object to limit this operation to a subset of232 Horovod processes. Default is the global process set.233 Returns:234 A handle to the allreduce operation that can be used with `poll()` or235 `synchronize()`.236 """237 op = handle_average_backwards_compatibility(op, average)238 ret = _allreduce_async(tensor, tensor.new(), name, op, prescale_factor, postscale_factor, process_set) if tensor.dtype == torch.int8 or tensor.dtype == torch.float16 else _allreduce_async(tensor, tensor, name, op, prescale_factor, postscale_factor, process_set)239 return ret240def allreduce_(tensor, average=None, name=None, op=None,241 prescale_factor=1.0, postscale_factor=1.0,242 process_set=global_process_set):243 """244 A function that performs in-place averaging or summation of the input tensor over245 all the Horovod processes.246 The reduction operation is keyed by the name. If name is not provided, an incremented247 auto-generated name is used. The tensor type and shape must be the same on all248 Horovod processes for a given name. The reduction will not start until all processes249 are ready to send and receive the tensor.250 Arguments:251 tensor: A tensor to reduce.252 average:253 .. warning:: .. deprecated:: 0.19.0254 Use `op` instead. Will be removed in v0.21.0.255 name: A name of the reduction operation.256 op: The reduction operation to combine tensors across different ranks. Defaults to257 Average if None is given.258 prescale_factor: Multiplicative factor to scale tensor before allreduce.259 postscale_factor: Multiplicative factor to scale tensor after allreduce.260 process_set: Process set object to limit this operation to a subset of261 Horovod processes. Default is the global process set.262 Returns:263 A tensor of the same shape and type as `tensor`, averaged or summed across all264 processes.265 """266 handle = allreduce_async_(tensor, average, name, op, prescale_factor, postscale_factor, process_set)267 return synchronize(handle)268def _grouped_allreduce_function_factory(tensor):269 return 'horovod_torch_grouped_allreduce_async_' + tensor.type().replace('.', '_')270def _grouped_allreduce_async(tensors, outputs, name, op, prescale_factor, postscale_factor, process_set: ProcessSet):271 # Set the divisor for reduced gradients to average when necessary272 if op == Average:273 if rocm_built():274 # For ROCm, perform averaging at framework level275 divisor = size()276 op = Sum277 else:278 divisor = 1279 elif op == Adasum:280 if process_set != global_process_set:281 raise NotImplementedError("Adasum does not support non-global process sets yet.")282 if tensors[0].device.type != 'cpu' and gpu_available('torch'):283 if nccl_built():284 if not is_homogeneous():285 raise NotImplementedError('Running GPU Adasum on heterogeneous cluster is not supported yet.')286 elif not num_rank_is_power_2(int(size() / local_size())):287 raise NotImplementedError('Running GPU Adasum with non-power of 2 nodes is not supported yet.')288 if rocm_built():289 # For ROCm, perform averaging at framework level290 divisor = local_size()291 else:292 divisor = 1293 else:294 warnings.warn('Adasum reduction does not currently support GPU reduction using MPI. Tensors are '295 'copied to CPU memory instead. To use Adasum for GPU reduction, please compile Horovod '296 'with HOROVOD_GPU_OPERATIONS=NCCL.')297 divisor = 1298 else:299 if not num_rank_is_power_2(size()):300 raise NotImplementedError('Running Adasum with non-power of 2 ranks is not supported yet.')301 divisor = 1302 else:303 divisor = 1304 function = _check_function(_grouped_allreduce_function_factory, tensors[0])305 try:306 handle = getattr(mpi_lib, function)(tensors, outputs, divisor,307 name.encode() if name is not None else _NULL, op,308 prescale_factor, postscale_factor, process_set.process_set_id)309 except RuntimeError as e:310 raise HorovodInternalError(e)311 _handle_map[handle] = (tuple(tensors), tuple(outputs))312 return handle313def grouped_allreduce_async(tensors, average=None, name=None, op=None,314 prescale_factor=1.0, postscale_factor=1.0,315 process_set=global_process_set):316 """317 A function that performs asynchronous averaging or summation of the input tensor318 list over all the Horovod processes. The input tensors are not modified.319 The reduction operations are keyed by the base name. If a base name is not320 provided, an incremented auto-generated base name is used. Reductions are321 performed across tensors in the same list position. The tensor type and322 shape must be the same on all Horovod processes for tensors sharing323 positions in the input tensor list. The reduction will not start until all324 processes are ready to send and receive the tensors.325 Arguments:326 tensors: A list of tensors to reduce.327 average:328 .. warning:: .. deprecated:: 0.19.0329 Use `op` instead. Will be removed in v0.21.0.330 name: A base name to use for the group reduction operation.331 op: The reduction operation to combine tensors across different332 ranks. Defaults to Average if None is given.333 prescale_factor: Multiplicative factor to scale tensor before allreduce.334 postscale_factor: Multiplicative factor to scale tensor after allreduce.335 process_set: Process set object to limit this operation to a subset of336 Horovod processes. Default is the global process set.337 Returns:338 A handle to the group allreduce operation that can be used with `poll()` or339 `synchronize()`.340 """341 op = handle_average_backwards_compatibility(op, average)342 outputs = [t.new(t.shape) for t in tensors]343 return _grouped_allreduce_async(tensors, outputs, name, op, prescale_factor, postscale_factor, process_set)344class HorovodGroupedAllreduce(torch.autograd.Function):345 """An autograd function that performs allreduce on a list of tensors."""346 @staticmethod347 def forward(ctx, average, name, op, prescale_factor, postscale_factor, process_set: ProcessSet, *tensors):348 ctx.average = average349 ctx.op = op350 ctx.prescale_factor = prescale_factor351 ctx.postscale_factor = postscale_factor352 ctx.process_set = process_set353 handle = grouped_allreduce_async(list(tensors), average, name, op, prescale_factor, postscale_factor,354 process_set)355 return synchronize(handle)356 @staticmethod357 def backward(ctx, *grad_output):358 grad_reduced = grouped_allreduce(list(grad_output), average=ctx.average, op=ctx.op,359 prescale_factor=ctx.prescale_factor,360 postscale_factor=ctx.postscale_factor,361 process_set=ctx.process_set)362 return (None, None, None, None, None, None, *grad_reduced)363def grouped_allreduce(tensors, average=None, name=None, compression=Compression.none, op=None,364 prescale_factor=1.0, postscale_factor=1.0, process_set=global_process_set):365 """366 A function that performs averaging or summation of the input tensor367 list over all the Horovod processes. The input tensors are not modified.368 The reduction operations are keyed by the base name. If a base name is not369 provided, an incremented auto-generated base name is used. Reductions are370 performed across tensors in the same list position. The tensor type and371 shape must be the same on all Horovod processes for tensors sharing372 positions in the input tensor list. The reduction will not start until all373 processes are ready to send and receive the tensors.374 This acts as a thin wrapper around an autograd function. If your input375 tensors require gradients, then calling this function will allow gradients376 to be computed and backpropagated.377 Arguments:378 tensors: A list of tensors to reduce.379 average:380 .. warning:: .. deprecated:: 0.19.0381 Use `op` instead. Will be removed in v0.21.0.382 name: A base name to use for the group reduction operation.383 compression: Compression algorithm used during allreduce to reduce the amount384 of data sent during the each parameter update step. Defaults to385 not using compression.386 op: The reduction operation to combine tensors across different ranks. Defaults387 to Average if None is given.388 prescale_factor: Multiplicative factor to scale tensor before allreduce.389 postscale_factor: Multiplicative factor to scale tensor after allreduce.390 process_set: Process set object to limit this operation to a subset of391 Horovod processes. Default is the global process set.392 Returns:393 A list containing tensors of the same shape and type as in `tensors`,394 averaged or summed across all processes.395 """396 tensors_compressed, ctxs = zip(*[compression.compress(t) for t in tensors])397 summed_tensors_compressed = HorovodGroupedAllreduce.apply(average, name, op,398 prescale_factor, postscale_factor,399 process_set, *tensors_compressed)400 return [compression.decompress(t, ctx) for t, ctx in zip(summed_tensors_compressed, ctxs)]401def grouped_allreduce_async_(tensors, average=None, name=None, op=None,402 prescale_factor=1.0, postscale_factor=1.0,403 process_set=global_process_set):404 """405 A function that performs asynchronous in-place averaging or summation of the input406 tensors over all the Horovod processes.407 The reduction operations are keyed by the base name. If a base name is not408 provided, an incremented auto-generated base name is used. Reductions are409 performed across tensors in the same list position. The tensor type and410 shape must be the same on all Horovod processes for tensors sharing411 positions in the input tensor list. The reduction will not start until all412 processes are ready to send and receive the tensors.413 Arguments:414 tensors: A list of tensors to reduce.415 average:416 .. warning:: .. deprecated:: 0.19.0417 Use `op` instead. Will be removed in v0.21.0.418 name: A base name to use for the group reduction operation.419 op: The reduction operation to combine tensors across different ranks. Defaults to420 Average if None is given.421 prescale_factor: Multiplicative factor to scale tensor before allreduce.422 postscale_factor: Multiplicative factor to scale tensor after allreduce.423 process_set: Process set object to limit this operation to a subset of424 Horovod processes. Default is the global process set.425 Returns:426 A handle to the group allreduce operation that can be used with `poll()` or427 `synchronize()`.428 """429 op = handle_average_backwards_compatibility(op, average)430 return _grouped_allreduce_async(tensors, tensors, name, op, prescale_factor, postscale_factor, process_set)431def grouped_allreduce_(tensors, average=None, name=None, op=None,432 prescale_factor=1.0, postscale_factor=1.0,433 process_set=global_process_set):434 """435 A function that performs in-place averaging or summation of the input tensors over436 all the Horovod processes.437 The reduction operations are keyed by the base name. If a base name is not438 provided, an incremented auto-generated base name is used. Reductions are439 performed across tensors in the same list position. The tensor type and440 shape must be the same on all Horovod processes for tensors sharing441 positions in the input tensor list. The reduction will not start until all442 processes are ready to send and receive the tensors.443 Arguments:444 tensors: A list of tensors to reduce.445 average:446 .. warning:: .. deprecated:: 0.19.0447 Use `op` instead. Will be removed in v0.21.0.448 name: A base name to use for the group reduction operation.449 op: The reduction operation to combine tensors across different ranks. Defaults to450 Average if None is given.451 prescale_factor: Multiplicative factor to scale tensor before allreduce.452 postscale_factor: Multiplicative factor to scale tensor after allreduce.453 process_set: Process set object to limit this operation to a subset of454 Horovod processes. Default is the global process set.455 Returns:456 A list containing tensors of the same shape and type as in `tensors`,457 averaged or summed across all processes.458 """459 handle = grouped_allreduce_async_(tensors, average, name, op, prescale_factor, postscale_factor, process_set)460 return synchronize(handle)461def sparse_allreduce_async(tensor, name, op, process_set=global_process_set):462 # Allgather aggregates along the first dimension, so we need to transpose the463 # indices to enforce correct concatenation behavior, then transpose back prior to464 # constructing the new aggregated sparse gradient465 t = tensor466 indices_handle = allgather_async(t._indices().transpose(0, 1).contiguous(), name=f'{name}.indices',467 process_set=process_set)468 values_handle = allgather_async(t._values(), name=f'{name}.values', process_set=process_set)469 def handle():470 # We need to sync values handle firstly for torch nightly >= 10.0471 # Issue: https://github.com/horovod/horovod/issues/2961472 values = synchronize(values_handle)473 indices = synchronize(indices_handle)474 values = (values / process_set.size()) if op == Average else values475 if indices.dim() == 0 or values.dim() == 0:476 return t.new().resize_as_(t)477 return t.new(indices.transpose(0, 1), values, t.size())478 return handle479def _allgather_function_factory(tensor):480 return 'horovod_torch_allgather_async_' + tensor.type().replace('.', '_')481def _allgather_async(tensor, output, name, process_set: ProcessSet):482 function = _check_function(_allgather_function_factory, tensor)483 try:484 handle = getattr(mpi_lib, function)(485 tensor, output, name.encode() if name is not None else _NULL,486 process_set.process_set_id)487 except RuntimeError as e:488 raise HorovodInternalError(e)489 _handle_map[handle] = (tensor, output)490 return handle491def allgather_async(tensor, name=None, process_set=global_process_set):492 """493 A function that asynchronously concatenates the input tensor with the same input494 tensor on all other Horovod processes. The input tensor is not modified.495 The concatenation is done on the first dimension, so the input tensors on the496 different processes must have the same rank and shape, except for the first497 dimension, which is allowed to be different.498 Arguments:499 tensor: A tensor to allgather.500 name: A name of the allgather operation.501 process_set: Process set object to limit this operation to a subset of502 Horovod processes. Default is the global process set.503 Returns:504 A handle to the allgather operation that can be used with `poll()` or505 `synchronize()`.506 """507 output = tensor.new()508 return _allgather_async(tensor, output, name, process_set)509class HorovodAllgather(torch.autograd.Function):510 """An autograd function that performs allgather on a tensor."""511 @staticmethod512 def forward(ctx, tensor, name, process_set: ProcessSet):513 ctx.dim = tensor.shape[0]514 ctx.process_set = process_set515 handle = allgather_async(tensor, name, process_set)516 return synchronize(handle)517 @staticmethod518 def backward(ctx, grad_output):519 grad_reduced = allreduce(grad_output, average=True, process_set=ctx.process_set)520 dim_t = torch.IntTensor([ctx.dim])521 dim = allgather(dim_t, process_set=ctx.process_set).view(ctx.process_set.size())522 r = ctx.process_set.rank()523 offset = torch.sum(dim.narrow(0, 0, r)).item() if r != 0 else 0524 return grad_reduced.narrow(0, offset, ctx.dim), None, None525def allgather(tensor, name=None, process_set=global_process_set):526 """527 A function that concatenates the input tensor with the same input tensor on528 all other Horovod processes. The input tensor is not modified.529 The concatenation is done on the first dimension, so the input tensors on the530 different processes must have the same rank and shape, except for the first531 dimension, which is allowed to be different.532 This acts as a thin wrapper around an autograd function. If your input533 tensor requires gradients, then callings this function will allow gradients534 to be computed and backpropagated.535 Arguments:536 tensor: A tensor to allgather.537 name: A name of the allgather operation.538 process_set: Process set object to limit this operation to a subset of539 Horovod processes. Default is the global process set.540 Returns:541 A tensor of the same type as `tensor`, concatenated on dimension zero542 across all processes. The shape is identical to the input shape, except for543 the first dimension, which may be greater and is the sum of all first544 dimensions of the tensors in different Horovod processes.545 """546 return HorovodAllgather.apply(tensor, name, process_set)547def _broadcast_function_factory(tensor):548 return 'horovod_torch_broadcast_async_' + tensor.type().replace('.', '_')549def _broadcast_async(tensor, output, root_rank, name, process_set: ProcessSet):550 function = _check_function(_broadcast_function_factory, tensor)551 try:552 handle = getattr(mpi_lib, function)(553 tensor, output, root_rank, name.encode() if name is not None else _NULL,554 process_set.process_set_id)555 except RuntimeError as e:556 raise HorovodInternalError(e)557 _handle_map[handle] = (tensor, output)558 return handle559def broadcast_async(tensor, root_rank, name=None, process_set=global_process_set):560 """561 A function that asynchronously broadcasts the input tensor on root rank to the same562 input tensor on all other Horovod processes. The input tensor is not modified.563 The broadcast operation is keyed by the name. If name is not provided, an incremented564 auto-generated name is used. The tensor type and shape must be the same on all565 Horovod processes for a given name. The broadcast will not start until all processes566 are ready to send and receive the tensor.567 Arguments:568 tensor: A tensor to broadcast.569 root_rank: The rank to broadcast the value from.570 name: A name of the broadcast operation.571 process_set: Process set object to limit this operation to a subset of572 Horovod processes. Default is the global process set.573 Returns:574 A handle to the broadcast operation that can be used with `poll()` or575 `synchronize()`.576 """577 output = tensor.new(tensor.shape)578 return _broadcast_async(tensor, output, root_rank, name, process_set)579class HorovodBroadcast(torch.autograd.Function):580 """An autograd function that broadcasts a tensor."""581 @staticmethod582 def forward(ctx, tensor, root_rank, name, process_set: ProcessSet):583 ctx.root_rank = root_rank584 ctx.process_set = process_set585 handle = broadcast_async(tensor, root_rank, name, process_set)586 return synchronize(handle)587 @staticmethod588 def backward(ctx, grad_output):589 grad_reduced = allreduce(grad_output, average=True, process_set=ctx.process_set)590 if rank() != ctx.root_rank:591 grad_reduced *= 0592 return grad_reduced, None, None, None593def broadcast(tensor, root_rank, name=None, process_set=global_process_set):594 """595 A function that broadcasts the input tensor on root rank to the same input tensor596 on all other Horovod processes. The input tensor is not modified.597 The broadcast operation is keyed by the name. If name is not provided, an incremented598 auto-generated name is used. The tensor type and shape must be the same on all599 Horovod processes for a given name. The broadcast will not start until all processes600 are ready to send and receive the tensor.601 This acts as a thin wrapper around an autograd function. If your input602 tensor requires gradients, then callings this function will allow gradients603 to be computed and backpropagated.604 Arguments:605 tensor: A tensor to broadcast.606 root_rank: The rank to broadcast the value from.607 name: A name of the broadcast operation.608 process_set: Process set object to limit this operation to a subset of609 Horovod processes. Default is the global process set.610 Returns:611 A tensor of the same shape and type as `tensor`, with the value broadcasted612 from root rank.613 """614 return HorovodBroadcast.apply(tensor, root_rank, name, process_set)615def broadcast_async_(tensor, root_rank, name=None, process_set=global_process_set):616 """617 A function that asynchronously broadcasts the input tensor on root rank to the same618 input tensor on all other Horovod processes. The operation is performed in-place.619 The broadcast operation is keyed by the name. If name is not provided, an incremented620 auto-generated name is used. The tensor type and shape must be the same on all621 Horovod processes for a given name. The broadcast will not start until all processes622 are ready to send and receive the tensor.623 Arguments:624 tensor: A tensor to broadcast.625 root_rank: The rank to broadcast the value from.626 name: A name of the broadcast operation.627 process_set: Process set object to limit this operation to a subset of628 Horovod processes. Default is the global process set.629 Returns:630 A handle to the broadcast operation that can be used with `poll()` or631 `synchronize()`.632 """633 return _broadcast_async(tensor, tensor, root_rank, name, process_set)634def broadcast_(tensor, root_rank, name=None, process_set=global_process_set):635 """636 A function that broadcasts the input tensor on root rank to the same input tensor637 on all other Horovod processes. The operation is performed in-place.638 The broadcast operation is keyed by the name. If name is not provided, an incremented639 auto-generated name is used. The tensor type and shape must be the same on all640 Horovod processes for a given name. The broadcast will not start until all processes641 are ready to send and receive the tensor.642 Arguments:643 tensor: A tensor to broadcast.644 root_rank: The rank to broadcast the value from.645 name: A name of the broadcast operation.646 process_set: Process set object to limit this operation to a subset of647 Horovod processes. Default is the global process set.648 Returns:649 A tensor of the same shape and type as `tensor`, with the value broadcasted650 from root rank.651 """652 handle = broadcast_async_(tensor, root_rank, name, process_set)653 return synchronize(handle)654def _alltoall_function_factory(tensor):655 return 'horovod_torch_alltoall_async_' + tensor.type().replace('.', '_')656def _alltoall_async(tensor, splits, output, output_received_splits, name, process_set: ProcessSet):657 if splits is None:658 # If splits not provided, create empty tensor as placeholder659 splits = torch.tensor([], dtype=torch.int32, device='cpu')660 elif not isinstance(splits, torch.Tensor):661 splits = torch.tensor(splits, dtype=torch.int32, device='cpu')662 function = _check_function(_alltoall_function_factory, tensor)663 try:664 handle = getattr(mpi_lib, function)(665 tensor, splits, output, output_received_splits, name.encode() if name is not None else _NULL,666 process_set.process_set_id)667 except RuntimeError as e:668 raise HorovodInternalError(e)669 _handle_map[handle] = (tensor, splits, (output, output_received_splits))670 return handle671def alltoall_async(tensor, splits=None, name=None, process_set=global_process_set):672 """673 A function that scatters slices of the input tensor to all other Horovod processes674 and returns a tensor of gathered slices from all other Horovod processes. The input675 tensor is not modified.676 The slicing is done on the first dimension, so the input tensors on677 the different processes must have the same rank and shape, except for the678 first dimension, which is allowed to be different.679 Arguments:680 tensor: A tensor to distribute with alltoall.681 splits: A tensor of integers in rank order describing how many682 elements in `tensor` to send to each worker. Splitting is683 applied along the first dimension of `tensor`. If `splits` is684 not provided, the first dimension is split equally by the685 number of Horovod processes.686 name: A name of the alltoall operation.687 process_set: Process set object to limit this operation to a subset of688 Horovod processes. Default is the global process set.689 Returns:690 A handle to the alltoall operation that can be used with `poll()` or691 `synchronize()`.692 """693 output = tensor.new()694 if isinstance(splits, torch.Tensor):695 output_received_splits = splits.new()696 else:697 output_received_splits = torch.empty(size(), dtype=torch.int32, device='cpu')698 return _alltoall_async(tensor, splits, output, output_received_splits, name, process_set)699class HorovodAlltoall(torch.autograd.Function):700 """An autograd function that performs alltoall on a tensor."""701 @staticmethod702 def forward(ctx, tensor, splits, name, process_set: ProcessSet):703 handle = alltoall_async(tensor, splits, name, process_set)704 output, received_splits = synchronize(handle)705 ctx.process_set = process_set706 ctx.recvsplits = received_splits707 if splits is None:708 return output709 else:710 ctx.mark_non_differentiable(received_splits)711 return output, received_splits712 @staticmethod713 def backward(ctx, grad_output, *dead_gradients):714 grad_wrt_tensor, _ = alltoall(grad_output, splits=ctx.recvsplits,715 process_set=ctx.process_set)716 return grad_wrt_tensor, None, None, None717def alltoall(tensor, splits=None, name=None, process_set=global_process_set):718 """719 A function that scatters slices of the input tensor to all other Horovod processes720 and returns a tensor of gathered slices from all other Horovod processes. The input721 tensor is not modified.722 The slicing is done on the first dimension, so the input tensors on723 the different processes must have the same rank and shape, except for the724 first dimension, which is allowed to be different.725 This acts as a thin wrapper around an autograd function. If your input726 tensor requires gradients, then callings this function will allow gradients727 to be computed and backpropagated.728 Arguments:729 tensor: A tensor to distribute with alltoall.730 splits: A tensor of integers in rank order describing how many731 elements in `tensor` to send to each worker. Splitting is732 applied along the first dimension of `tensor`. If `splits` is733 not provided, the first dimension is split equally by the734 number of Horovod processes.735 name: A name of the alltoall operation.736 process_set: Process set object to limit this operation to a subset of737 Horovod processes. Default is the global process set.738 Returns:739 1) A tensor containing the gathered tensor data from all workers.740 2) If `splits` has been provided: A tensor of integers in rank order741 describing how many elements in the output tensor have been received742 from each worker.743 """744 return HorovodAlltoall.apply(tensor, splits, name, process_set)745def poll(handle):746 """747 Polls an allreduce, allgather or broadcast handle to determine whether underlying748 asynchronous operation has completed. After `poll()` returns `True`, `synchronize()`749 will return without blocking.750 Arguments:751 handle: A handle returned by an allreduce, allgather or broadcast asynchronous752 operation.753 Returns:754 A flag indicating whether the operation has completed.755 """756 return mpi_lib.horovod_torch_poll(handle) != 0757def synchronize(handle):758 """759 Synchronizes an asynchronous allreduce, allgather, alltoall or broadcast operation until760 it's completed. Returns the result of the operation.761 Arguments:762 handle: A handle returned by an allreduce, allgather, alltoall or broadcast asynchronous763 operation.764 Returns:765 A single output tensor of the operation or a tuple of multiple output tensors.766 """767 if handle not in _handle_map:768 return769 try:770 mpi_lib.horovod_torch_wait_and_clear(handle)771 output = _handle_map.pop(handle)[-1]772 return output773 except RuntimeError as e:774 _handle_map.pop(handle, None)775 raise HorovodInternalError(e)776def join(device=-1) -> int:777 """A function that indicates that the rank finished processing data.778 All ranks that did not call join() continue to process allreduce operations.779 This function blocks Python thread until all ranks join.780 Arguments:781 device: An id of the device to create temprorary zero tensors (default -1, CPU)782 Returns:783 Id of the rank that joined last.784 """785 output = torch.tensor(-1, dtype=torch.int, device=torch.device("cpu"))786 try:787 handle = mpi_lib.horovod_torch_join(output, device)788 except RuntimeError as e:789 raise HorovodInternalError(e)790 _handle_map[handle] = (None, output)791 return synchronize(handle).item()792def barrier(process_set=global_process_set):793 """794 A function that acts as a simple sychronization point for ranks specified795 in the given process group(default to global group). Ranks that reach796 this function call will stall until all other ranks have reached.797 Arguments:798 process_set: Process set object to limit this operation to a subset of799 Horovod processes. Default is the global process set.800 """801 try:802 handle = mpi_lib.horovod_torch_barrier(process_set.process_set_id)803 except RuntimeError as e:804 raise HorovodInternalError(e)805 _handle_map[handle] = (None, None)...

Full Screen

Full Screen

event_creator.py

Source:event_creator.py Github

copy

Full Screen

1import event_desc_parser2import parse_logger3import time4from evendate_api import post_event_to_evendate5from file_keeper import read_checked_urls, read_completed_urls, read_ignored_urls, write_completed_url6def prepare_msg_header(org, done_list, error_list):7 return "New events for {} +{}/-{}".format(org, len(done_list), len(error_list))8def prepare_msg_text(done_list, error_list):9 text = ""10 for res_url, url in done_list:11 text += "ADDED " + res_url + "\r\n"12 for url in error_list:13 text += "ERROR " + url + "\r\n"14 return text15def __process_bunch(file_name, org, processor):16 checked_urls = read_checked_urls(file_name)17 done_urls = read_completed_urls(file_name)18 ignored_urls = read_ignored_urls()19 process_set = checked_urls.difference(done_urls).difference(ignored_urls)20 done_list = []21 error_list = []22 for url in process_set:23 print("processing {}/{}".format(len(process_set) - len(process_set.difference(done_urls)), len(process_set)))24 try:25 res_url, event_id = post_event_to_evendate(processor(url))26 except Exception as e:27 error_list.append(url)28 parse_logger.log_event_parsing_error(url, e)29 continue30 if res_url is not None:31 write_completed_url(file_name, url)32 done_list.append((res_url, url))33 else:34 error_list.append(url)35 time.sleep(10)36 server = parse_logger.get_email_server()37 msg_header = prepare_msg_header(org, done_list, error_list)38 parse_logger.send_email(server, msg_header, prepare_msg_text(done_list, error_list))39 server.quit()40def process_digital_october():41 __process_bunch("digit_october.txt", "Digital October", event_desc_parser.parse_desc_from_digit_october)42def process_planetarium():43 __process_bunch("planetarium.txt", "Planetarium (CHANGE DATES)", event_desc_parser.parse_desc_from_planetarium)44def process_strelka():45 __process_bunch("strelka.txt", "Strelka", event_desc_parser.parse_desc_from_strelka)46def process_tretyako():47 __process_bunch("tretyako.txt", "Tretyakovskay gallery", event_desc_parser.parse_desc_from_tretyako)48def process_garage():49 __process_bunch("garage.txt", "Garage", event_desc_parser.parse_desc_from_garage)50def process_yandex():51 __process_bunch("yandex.txt", "Yandex", event_desc_parser.parse_desc_from_yandex)52def process_flacon():53 __process_bunch("flacon.txt", "Flacon", event_desc_parser.parse_desc_from_flacon)54def process_vinzavod():55 __process_bunch("vinzavod.txt", "Winzavod", event_desc_parser.parse_desc_from_vinzavod)56def process_gorky_park():57 __process_bunch("gorky_park.txt", "Gorky park", event_desc_parser.parse_desc_from_gorky_park)58def process_artplay():59 __process_bunch("artplay.txt", "Artplay", event_desc_parser.parse_desc_from_artplay)60def process_centermars():61 __process_bunch("centermars.txt", "Center Mars", event_desc_parser.parse_desc_from_centermars)62def process_mail():63 __process_bunch("mail.txt", "Mail.ru", event_desc_parser.parse_desc_from_mail)64def process_ditelegraph():65 __process_bunch("ditelegraph.txt", "DI Telegraph", event_desc_parser.parse_desc_from_ditelegraph)66def process_mamm():67 __process_bunch("mamm.txt", "Mamm", event_desc_parser.parse_desc_from_mamm)68def process_all():69 process_strelka()70 process_digital_october()71 process_planetarium()72 # process_tretyako()73 process_garage()74 process_yandex()75 process_flacon()76 process_vinzavod()77 process_gorky_park()78 process_artplay()79 process_centermars()80 process_mail()81 process_ditelegraph()...

Full Screen

Full Screen

Chuseok_traffic.py

Source:Chuseok_traffic.py Github

copy

Full Screen

1from datetime import datetime, timedelta2def solution(lines):3 sec_list = []4 process_list = []5 for line in lines:6 # [1초구간 설정]7 # 1. 각 처리완료 시간이 각 1초구간의 시작점8 time_start = datetime.strptime(line[0:23], "%Y-%m-%d %H:%M:%S.%f")9 # 2. 시작점으로 부터 1초 동안의 영역 집합10 sec_set = {time_start + timedelta(milliseconds=x) for x in range(1000)}11 # 3. 집합들의 리스트12 sec_list.append(sec_set)13 # [처리시간 영역 설정]14 # 1. 각 처리시간별 영역들의 집합15 time_range = int(float(line[24:-1]) * 1000)16 if time_range >= 3:17 time_step = int(time_range / 3)18 else:19 time_step = 120 process_set = {time_start - timedelta(milliseconds=x) for x in \21 [0, time_step, 2 * time_step, time_range - 1]}22 # 2. 집합들의 리스트23 process_list.append(process_set)24 # 각 1초구간별로 처리시간 영역들과 얼마나 교차점이 있는지 확인25 process = []26 for sec_set in sec_list:27 count = 028 for process_set in process_list:29 if sec_set & process_set:30 count += 131 process.append(count)32 answer = max(process)33 return answer34"""35test case36times1 = ["2016-09-15 20:59:57.421 0.351s",37"2016-09-15 20:59:58.233 1.181s",38"2016-09-15 20:59:58.299 0.8s",39"2016-09-15 20:59:58.688 1.041s",40"2016-09-15 20:59:59.591 1.412s",41"2016-09-15 21:00:00.464 1.466s",42"2016-09-15 21:00:00.741 1.581s",43"2016-09-15 21:00:00.748 2.31s",44"2016-09-15 21:00:00.966 0.381s",45"2016-09-15 21:00:02.066 2.62s"]46result1 = 747times2 = ["2016-09-15 01:00:04.002 2.0s", 48"2016-09-15 01:00:07.000 2s"]49result2 = 250times2 = ["2016-09-15 01:00:04.001 2.0s",51"2016-09-15 01:00:07.000 2s"]52result3 = 1...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tox automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful