# How to use devices method in ATX

Best Python code snippet using ATX

all_reduce.py

Source:all_reduce.py

`1# Copyright 2017 The TensorFlow Authors. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14# ==============================================================================15"""Utilities to construct a TF subgraph implementing distributed All-Reduce."""16from __future__ import absolute_import17from __future__ import division18from __future__ import print_function19import collections20import math21from tensorflow.contrib import nccl22from tensorflow.python.framework import device as device_lib23from tensorflow.python.framework import ops24from tensorflow.python.ops import array_ops25from tensorflow.python.ops import math_ops26def _flatten_tensors(tensors):27 """Check tensors for isomorphism and flatten.28 Args:29 tensors: list of T @{tf.Tensor} which must all have the same shape.30 Returns:31 tensors: a list of T @{tf.Tensor} which are flattened (1D) views of tensors32 shape: the original shape of each element of input tensors33 Raises:34 ValueError: tensors are empty or non-isomorphic or have unknown shape.35 """36 if not tensors:37 raise ValueError("tensors cannot be empty")38 shape = tensors[0].shape39 for tensor in tensors:40 shape = shape.merge_with(tensor.shape)41 if not shape.is_fully_defined():42 raise ValueError("Tensors must have statically known shape.")43 if len(shape) != 1:44 reshaped = []45 for t in tensors:46 with ops.colocate_with(t):47 reshaped.append(array_ops.reshape(t, [-1]))48 tensors = reshaped49 return tensors, shape50def _reshape_tensors(tensors, shape):51 """Reshape tensors flattened by _flatten_tensors.52 Args:53 tensors: list of T @{tf.Tensor} of identical length 1D tensors.54 shape: list of integers describing the desired shape. Product of55 the elements must equal the length of each tensor.56 Returns:57 list of T @{tf.Tensor} which are the reshaped inputs.58 """59 reshaped = []60 for t in tensors:61 with ops.colocate_with(t):62 reshaped.append(array_ops.reshape(t, shape))63 return reshaped64def _padded_split(tensor, pieces):65 """Like split for 1D tensors but pads-out case where len % pieces != 0.66 Args:67 tensor: T @{tf.Tensor} that must be 1D.68 pieces: a positive integer specifying the number of pieces into which69 tensor should be split.70 Returns:71 list of T @{tf.Tensor} of length pieces, which hold the values of72 thin input tensor, in order. The final tensor may73 be zero-padded on the end to make its size equal to those of all74 of the other tensors.75 Raises:76 ValueError: The input tensor is not 1D.77 """78 shape = tensor.shape79 if 1 != len(shape):80 raise ValueError("input tensor must be 1D")81 tensor_len = shape[0].value82 with ops.colocate_with(tensor):83 if tensor_len % pieces != 0:84 # pad to an even length85 chunk_size = 1 + tensor_len // pieces86 if pieces > tensor_len:87 # This is an edge case that should not come up in practice,88 # i.e. a different reduction algorithm would be better,89 # but we'll make it work just for completeness.90 pad_len = pieces - tensor_len91 extended_whole = array_ops.concat(92 [tensor, array_ops.zeros([pad_len], dtype=tensor.dtype)], 0)93 parts = array_ops.split(extended_whole, pieces)94 return parts, pad_len95 elif (pieces - 1) * chunk_size >= tensor_len:96 # Another edge case of limited real interest.97 pad_len = (pieces * chunk_size) % tensor_len98 extended_whole = array_ops.concat(99 [tensor, array_ops.zeros([pad_len], dtype=tensor.dtype)], 0)100 parts = array_ops.split(extended_whole, pieces)101 return parts, pad_len102 else:103 last_chunk_size = tensor_len - (pieces - 1) * chunk_size104 pad_len = chunk_size - last_chunk_size105 piece_lens = [chunk_size for _ in range(pieces - 1)] + [last_chunk_size]106 parts = array_ops.split(tensor, piece_lens)107 parts[-1] = array_ops.concat(108 [parts[-1], array_ops.zeros([pad_len], dtype=tensor.dtype)], 0)109 return parts, pad_len110 else:111 return array_ops.split(tensor, pieces), 0112def _strip_padding(tensors, pad_len):113 """Strip the suffix padding added by _padded_split.114 Args:115 tensors: list of T @{tf.Tensor} of identical length 1D tensors.116 pad_len: number of elements to be stripped from the end of each tensor.117 Returns:118 list of T @{tf.Tensor} which are the stripped inputs.119 Raises:120 ValueError: tensors must be a non-empty list of 1D tensors, and121 each must be longer than pad_len.122 """123 if not tensors:124 raise ValueError("tensors cannot be empty")125 shape = tensors[0].shape126 if len(shape) > 1:127 raise ValueError("tensors must be 1D")128 prefix_len = int(shape[0] - pad_len)129 if prefix_len < 0:130 raise ValueError("pad_len longer than tensor")131 stripped = []132 for t in tensors:133 with ops.colocate_with(t):134 stripped.append(array_ops.slice(t, [0], [prefix_len]))135 return stripped136def _ragged_split(tensor, pieces):137 """Like split for 1D tensors but allows case where len % pieces != 0.138 Args:139 tensor: T @{tf.Tensor} that must be 1D.140 pieces: a positive integer specifying the number of pieces into which141 tensor should be split.142 Returns:143 list of T @{tf.Tensor} of length pieces, which hold the values of144 the input tensor, in order. The final tensor may be shorter145 than the others, which will all be of equal length.146 Raises:147 ValueError: input tensor must be 1D.148 """149 shape = tensor.shape150 if 1 != len(shape):151 raise ValueError("input tensor must be 1D")152 tensor_len = shape[0].value153 chunk_size = tensor_len // pieces154 with ops.colocate_with(tensor):155 if tensor_len != (pieces * chunk_size):156 # last piece will be short157 assert pieces > 1158 last_chunk_size = tensor_len - ((pieces - 1) * chunk_size)159 assert last_chunk_size > 0160 piece_lens = [chunk_size for _ in range(pieces - 1)] + [last_chunk_size]161 return array_ops.split(tensor, piece_lens)162 else:163 return array_ops.split(tensor, pieces)164def _ring_permutations(num_workers, num_subchunks, gpu_perm):165 """"Generate an array of device index arrays, one for each subchunk.166 In the basic ring reduction algorithm there are size(T)/num_devices167 data chunks and each device process one chunk per tick, i.e. sending168 one chunk and receiving one chunk. The idea of subchunking is that169 each device processes num_subchunks smaller data regions per tick,170 and the ring rank permutation is different for each subchunk index171 so that a device is potentially sending to and receiving from172 num_subchunks different other devices at each tick. Where multiple173 independent data channels exist between devices, this strategy174 supplies a method of using them in parallel.175 Args:176 num_workers: number of worker tasks177 num_subchunks: number of subchunks into which to divide each per-GPU chunk.178 gpu_perm: an array of integers in [0, num_gpus-1] giving the default179 ring order of GPUs at each worker. Other permutations will be generated180 by rotating this array and splicing together per-worker instances.181 Raises:182 ValueError: the number of subchunks may not exceed the number of GPUs.183 Returns:184 pred_by_s_d: list of lists that maps (by index) from (subchunk, dev) to185 preceding device in the permutation for that subchunk. The186 device index of GPU i at worker j is i + (j * num_gpus).187 rank_by_s_d: list of lists that maps (by index) from (subchunk, dev) to188 local rank of device d in the permutation for that subchunk.189 """190 num_gpus = len(gpu_perm)191 devices = num_workers * num_gpus192 if devices == 0:193 return [], []194 if num_subchunks > num_gpus:195 raise ValueError(196 "num_subchunks %d must be <= num_gpus %d" % (num_subchunks, num_gpus))197 rotation_interval = max(1, int(num_gpus / num_subchunks))198 perms_by_s = []199 for s in range(0, num_subchunks):200 full_order = []201 offset = s * rotation_interval202 for w in range(0, num_workers):203 default_order = [(w * num_gpus) + i for i in gpu_perm]204 dev_order = default_order[offset:] + default_order[:offset]205 full_order += dev_order206 perms_by_s.append(full_order)207 pred_by_s_d = [[-1 for d in range(0, devices)]208 for s in range(0, num_subchunks)]209 rank_by_s_d = [[-1 for d in range(0, devices)]210 for s in range(0, num_subchunks)]211 for s in range(0, num_subchunks):212 for d in range(0, devices):213 for t in range(0, devices):214 if d == perms_by_s[s][t]:215 rank_by_s_d[s][d] = t216 pred_by_s_d[s][d] = perms_by_s[s][(t + devices - 1) % devices]217 break218 return (pred_by_s_d, rank_by_s_d)219def build_ring_all_reduce(input_tensors, num_workers, num_subchunks,220 gpu_perm, red_op, un_op=None):221 """Construct a subgraph performing a ring-style all-reduce of input_tensors.222 Args:223 input_tensors: a list of T @{tf.Tensor} objects, which must all224 have the same shape and type.225 num_workers: number of worker tasks spanned by input_tensors.226 num_subchunks: number of subchunks each device should process in one tick.227 gpu_perm: a list of ints giving a ring-wise rank ordering of GPUs at228 each worker. All workers must have the same number of229 GPUs with the same rank ordering. If NVLINK is available, this should230 be a ring order supported by NVLINK edges.231 red_op: a binary operator for elementwise reduction.232 un_op: an optional unary operator to apply to fully reduced values.233 Raises:234 ValueError: empty input_tensors or they don't all have same235 size.236 Returns:237 a list of T @{tf.Tensor} identical sum-reductions of input_tensors.238 """239 if len(input_tensors) < 2:240 raise ValueError("input_tensors must be length 2 or longer")241 input_tensors, shape = _flatten_tensors(input_tensors)242 devices = [t.device for t in input_tensors]243 (pred_by_s_d, rank_by_s_d) = _ring_permutations(244 num_workers, num_subchunks, gpu_perm)245 chunks_by_dev, pad_len = _build_ring_gather(246 input_tensors, devices,247 num_subchunks, pred_by_s_d, rank_by_s_d, red_op)248 if un_op:249 chunks_by_dev = _apply_unary_to_chunks(un_op, chunks_by_dev)250 output_tensors = _build_ring_scatter(pred_by_s_d, rank_by_s_d,251 chunks_by_dev)252 if pad_len > 0:253 output_tensors = _strip_padding(output_tensors, pad_len)254 if len(shape) != 1:255 output_tensors = _reshape_tensors(output_tensors, shape)256 return output_tensors257def _build_ring_gather(input_tensors, devices, num_subchunks,258 pred_by_s_d, rank_by_s_d, red_op):259 """Construct a subgraph for the first (reduction) pass of ring all-reduce.260 Args:261 input_tensors: a list of T @{tf.Tensor} 1D input tensors of same262 shape and type.263 devices: array of device name strings264 num_subchunks: number of subchunks each device should process in one tick.265 pred_by_s_d: as produced by _ring_permutations266 rank_by_s_d: as produced by _ring_permutations267 red_op: a binary operator for elementwise reduction268 Raises:269 ValueError: tensors must all be one dimensional.270 Returns:271 list of list of T @{tf.Tensor} of (partially) reduced values where272 exactly num_subchunks chunks at each device are fully reduced.273 """274 num_devices = len(input_tensors)275 if num_devices == 0:276 return []277 if num_devices == 1:278 return input_tensors279 shape = input_tensors[0].shape280 if 1 != len(shape):281 raise ValueError("input tensors must be 1D")282 num_chunks = num_devices * num_subchunks283 num_ticks = num_devices - 1284 # Initialize chunks_by_dev with splits of the input tensors.285 chunks_by_dev = []286 split_pad_len = 0287 for d in range(0, num_devices):288 with ops.device(devices[d]):289 splits, split_pad_len = _padded_split(input_tensors[d], num_chunks)290 chunks_by_dev.append(splits)291 # Reduction phase292 for tick in range(0, num_ticks):293 # One new partial reduction for every chunk294 new_partial_reductions = [None for _ in range(0, num_chunks)]295 # Compute reductions with respect to last tick's values296 for d in range(0, num_devices):297 with ops.device(devices[d]):298 for s in range(0, num_subchunks):299 rank = rank_by_s_d[s][d]300 seg_index = (rank + num_devices - (2 + tick)) % num_devices301 pred_dev = pred_by_s_d[s][d]302 chunk_index = (seg_index * num_subchunks) + s303 new_partial_reductions[chunk_index] = red_op(304 chunks_by_dev[pred_dev][chunk_index],305 chunks_by_dev[d][chunk_index])306 # Update chunks_by_dev with the new values at the end of the tick.307 for d in range(0, num_devices):308 for s in range(0, num_subchunks):309 rank = rank_by_s_d[s][d]310 seg_index = (rank + num_devices - (2 + tick)) % num_devices311 chunk_index = (seg_index * num_subchunks) + s312 chunks_by_dev[d][chunk_index] = new_partial_reductions[chunk_index]313 return chunks_by_dev, split_pad_len314def _apply_unary_to_chunks(f, chunks_by_dev):315 """Apply a unary op to each tensor in chunks_by_dev, on same device.316 Args:317 f: a unary function over T @{tf.Tensor}.318 chunks_by_dev: list of lists of T @{tf.Tensor}.319 Returns:320 new list of lists of T @{tf.Tensor} with the same structure as321 chunks_by_dev containing the derived tensors.322 """323 output = []324 for x in chunks_by_dev:325 with ops.colocate_with(x[0]):326 output.append([f(t) for t in x])327 return output328def _build_ring_scatter(pred_by_s_d, rank_by_s_d,329 chunks_by_dev):330 """Construct subgraph for second (scatter) pass of ring all-reduce.331 Args:332 pred_by_s_d: as produced by _ring_permutations333 rank_by_s_d: as produced by _ring_permutations334 chunks_by_dev: list of list of T @{tf.Tensor} indexed by ints335 (device, chunk)336 Raises:337 ValueError: chunks_by_dev is not well-formed338 Returns:339 list of T @{tf.Tensor} which are the fully reduced tensors, one340 at each device corresponding to the outer dimension of chunks_by_dev.341 """342 num_devices = len(chunks_by_dev)343 num_chunks = len(chunks_by_dev[0])344 if 0 != num_chunks % num_devices:345 raise ValueError(346 "Expect number of chunks per device to be divisible by num_devices")347 num_subchunks = int(num_chunks / num_devices)348 num_ticks = num_devices - 1349 for tick in range(0, num_ticks):350 passed_values = [None for _ in range(0, num_chunks)]351 for d in range(0, num_devices):352 with ops.colocate_with(chunks_by_dev[d][0]):353 for s in range(0, num_subchunks):354 rank = rank_by_s_d[s][d]355 seg_index = (rank + num_devices - (1 + tick)) % num_devices356 pred_dev = pred_by_s_d[s][d]357 chunk_index = (seg_index * num_subchunks) + s358 passed_values[chunk_index] = array_ops.identity(359 chunks_by_dev[pred_dev][chunk_index])360 for d in range(0, num_devices):361 for s in range(0, num_subchunks):362 rank = rank_by_s_d[s][d]363 seg_index = (rank + num_devices - (1 + tick)) % num_devices364 chunk_index = (seg_index * num_subchunks) + s365 chunks_by_dev[d][chunk_index] = passed_values[chunk_index]366 # Join chunks at each device.367 output = []368 for x in chunks_by_dev:369 with ops.colocate_with(x[0]):370 output.append(array_ops.concat(x, 0))371 return output372def build_recursive_hd_all_reduce(input_tensors, red_op, un_op=None):373 """Construct a subgraph for recursive halving-doubling all-reduce.374 The recursive halving-doubling algorithm is described in375 http://www.mcs.anl.gov/~thakur/papers/ijhpca-coll.pdf376 The concept is to arrange the participating n devices in377 a linear sequence where devices exchange data pairwise378 with one other device in each round. During the gather379 phase there are lg(n) rounds where devices exchange380 increasingly smaller sub-tensors with another device381 at increasingly greater distances, until at the top382 each device has 1/n of the fully reduced values. During the383 scatter phase each device exchanges its fully reduced384 sub-tensor (which doubles in length at each round)385 with one other device at increasingly smaller distances386 until each device has all of the fully reduced values.387 Note: this preliminary version requires that len(input_tensors) be a388 power of 2. TODO(tucker): relax this restriction. Also, the389 number of elements in each tensor must be divisible by 2^h where h390 is the number of hops in each phase. This will also be relaxed in391 the future with edge-case specific logic.392 Args:393 input_tensors: list of T @{tf.Tensor} to be elementwise reduced.394 red_op: a binary elementwise reduction Op.395 un_op: an optional unary elementwise Op to apply to reduced values.396 Returns:397 list of T @{tf.Tensor} which are the fully reduced tensors, one398 at each device of input_tensors.399 Raises:400 ValueError: num_devices not a power of 2, or tensor len not divisible401 by 2 the proper number of times.402 """403 devices = [t.device for t in input_tensors]404 input_tensors, shape = _flatten_tensors(input_tensors)405 reduced_shards = _build_recursive_hd_gather(input_tensors, devices, red_op)406 if un_op:407 reduced_shards = [un_op(t) for t in reduced_shards]408 output_tensors = _build_recursive_hd_scatter(reduced_shards, devices)409 if len(shape) != 1:410 output_tensors = _reshape_tensors(output_tensors, shape)411 return output_tensors412def _build_recursive_hd_gather(input_tensors, devices, red_op):413 """Construct the gather phase of recursive halving-doubling all-reduce.414 Args:415 input_tensors: list of T @{tf.Tensor} to be elementwise reduced.416 devices: a list of strings naming the devices hosting input_tensors,417 which will also be used to host the (partial) reduction values.418 red_op: a binary elementwise reduction Op.419 Returns:420 list of T @{tf.Tensor} which are the fully reduced tensor shards.421 Raises:422 ValueError: num_devices not a power of 2, or tensor len not divisible423 by 2 the proper number of times.424 """425 num_devices = len(devices)426 num_hops = int(math.log(num_devices, 2))427 if num_devices != (2 ** num_hops):428 raise ValueError("num_devices must be a power of 2")429 chunks = input_tensors430 for h in range(0, num_hops):431 span = 2 ** h432 group_size = span * 2433 new_chunks = [[] for _ in devices]434 for d in range(0, num_devices):435 if (d % group_size) >= (group_size / 2):436 # skip right half of a pair437 continue438 left_dev = devices[d]439 right_dev = devices[d + span]440 left_split = array_ops.split(chunks[d], 2)441 right_split = array_ops.split(chunks[d+span], 2)442 with ops.device(left_dev):443 new_chunks[d] = red_op(left_split[0], right_split[0])444 with ops.device(right_dev):445 new_chunks[d + span] = red_op(left_split[1], right_split[1])446 chunks = new_chunks447 return chunks448def _build_recursive_hd_scatter(input_tensors, devices):449 """Construct the scatter phase of recursive halving-doublng all-reduce.450 Args:451 input_tensors: list of T @{tf.Tensor} that are fully-reduced shards.452 devices: a list of strings naming the devices on which the reconstituted453 full tensors should be placed.454 Returns:455 list of T @{tf.Tensor} which are the fully reduced tensors.456 """457 num_devices = len(devices)458 num_hops = int(math.log(num_devices, 2))459 assert num_devices == (2 ** num_hops), "num_devices must be a power of 2"460 chunks = input_tensors461 for h in reversed(range(0, num_hops)):462 span = 2 ** h463 group_size = span * 2464 new_chunks = [[] for _ in devices]465 for d in range(0, num_devices):466 if (d % group_size) >= (group_size / 2):467 # skip right half of a pair468 continue469 left_idx = d470 right_idx = d + span471 left_dev = devices[left_idx]472 right_dev = devices[right_idx]473 with ops.device(left_dev):474 new_chunks[left_idx] = array_ops.concat([chunks[left_idx],475 chunks[right_idx]], 0)476 with ops.device(right_dev):477 new_chunks[right_idx] = array_ops.concat([chunks[left_idx],478 chunks[right_idx]], 0)479 chunks = new_chunks480 return chunks481def build_shuffle_all_reduce(input_tensors, gather_devices, red_op, un_op=None):482 """Construct a subgraph for shuffle all-reduce.483 Shuffle reduce is essentially the algorithm implemented when using484 parameter servers. Suppose tensor length is n, there are d devices485 and g gather shards. Each device sends a n/g length sub-tensor to486 each gather shard. The gather shards perform a reduction across d487 fragments, then broadcast the result back to each device. The488 devices then join the g fully reduced fragments they receive from489 the shards. The gather shards could perform d-1 pairwise490 reductions, or one d-way reduction. The first is better where491 reduction Op time is low compared to transmission time, the second492 better in the other case.493 Args:494 input_tensors: list of T @(tf.Tensor} values to be reduced.495 gather_devices: list of names of devices on which reduction shards496 should be placed.497 red_op: an n-array elementwise reduction Op498 un_op: optional elementwise unary Op to be applied to fully-reduced values.499 Returns:500 list of T @{tf.Tensor} which are the fully reduced tensors.501 """502 input_tensors, shape = _flatten_tensors(input_tensors)503 dst_devices = [t.device for t in input_tensors]504 reduced_shards = _build_shuffle_gather(input_tensors, gather_devices,505 red_op, un_op)506 output_tensors = _build_shuffle_scatter(reduced_shards, dst_devices)507 if len(shape) != 1:508 output_tensors = _reshape_tensors(output_tensors, shape)509 return output_tensors510def _build_shuffle_gather(input_tensors, gather_devices, red_op, un_op=None):511 """Construct the gather (concentrate and reduce) phase of shuffle all-reduce.512 Args:513 input_tensors: list of T @(tf.Tensor} values to be reduced.514 gather_devices: list of names of devices on which reduction shards515 should be placed.516 red_op: the binary reduction Op517 un_op: optional elementwise unary Op to be applied to fully-reduced values.518 Returns:519 list of T @{tf.Tensor} which are the fully reduced shards.520 Raises:521 ValueError: inputs not well-formed.522 """523 num_source_devices = len(input_tensors)524 num_gather_devices = len(gather_devices)525 shape = input_tensors[0].shape526 if len(shape) != 1:527 raise ValueError("input_tensors must be 1D")528 shards_by_source = []529 for d in range(0, num_source_devices):530 with ops.colocate_with(input_tensors[d]):531 shards_by_source.append(532 _ragged_split(input_tensors[d], num_gather_devices))533 reduced_shards = []534 for d in range(0, num_gather_devices):535 with ops.device(gather_devices[d]):536 values = [s[d] for s in shards_by_source]537 red_shard = red_op(values)538 if un_op:539 red_shard = un_op(red_shard)540 reduced_shards.append(red_shard)541 return reduced_shards542def _build_shuffle_scatter(reduced_shards, dst_devices):543 """Build the scatter phase of shuffle all-reduce.544 Args:545 reduced_shards: list of T @(tf.Tensor} fully reduced shards546 dst_devices: list of names of devices at which the fully-reduced value547 should be reconstituted.548 Returns:549 list of T @{tf.Tensor} scattered tensors.550 """551 num_devices = len(dst_devices)552 out_tensors = []553 for d in range(0, num_devices):554 with ops.device(dst_devices[d]):555 out_tensors.append(array_ops.concat(reduced_shards, 0))556 return out_tensors557def _split_by_task(devices, values):558 """Partition devices and values by common task.559 Args:560 devices: list of device name strings561 values: list of T @{tf.tensor} of same length as devices.562 Returns:563 (per_task_devices, per_task_values) where both values are564 lists of lists with isomorphic structure: the outer list is565 indexed by task, and the inner list has length of the number566 of values belonging to that task. per_task_devices contains567 the specific devices to which the values are local, and568 per_task_values contains the corresponding values.569 Raises:570 ValueError: devices must be same length as values.571 """572 num_devices = len(devices)573 if num_devices != len(values):574 raise ValueError("len(devices) must equal len(values)")575 per_task_devices = collections.OrderedDict()576 per_task_values = collections.OrderedDict()577 for d in range(num_devices):578 d_spec = device_lib.DeviceSpec.from_string(devices[d])579 if not hasattr(d_spec, "task") or d_spec.task is None:580 assert False, "failed to parse device %s" % devices[d]581 index = (d_spec.job or "localhost", d_spec.replica or 0, d_spec.task)582 if index not in per_task_devices:583 per_task_devices[index] = []584 per_task_values[index] = []585 per_task_devices[index].append(devices[d])586 per_task_values[index].append(values[d])587 return (list(per_task_devices.values()), list(per_task_values.values()))588def build_nccl_all_reduce(input_tensors, red_op, un_op=None):589 """Build a subgraph that does one full all-reduce, using NCCL.590 Args:591 input_tensors: list of T @{tf.Tensor} of same-shape and type values to592 be reduced.593 red_op: binary elementwise reduction operator. Must be one of594 {tf.add}595 un_op: optional unary elementwise Op to apply to fully-reduce values.596 Returns:597 list of T @{tf.Tensor} of reduced values.598 Raises:599 ValueError: red_op not supported.600 """601 if red_op == math_ops.add:602 output_tensors = nccl.all_sum(input_tensors)603 else:604 raise ValueError("red_op not supported by NCCL all-reduce: ", red_op)605 if un_op:606 un_op_wrapped = []607 for t in output_tensors:608 with ops.colocate_with(t):609 un_op_wrapped.append(un_op(t))610 output_tensors = un_op_wrapped611 return output_tensors612def _build_nccl_hybrid(input_tensors, red_op, upper_level_f):613 """Construct a subgraph for NCCL hybrid all-reduce.614 Args:615 input_tensors: list of T @{tf.Tensor} of same-shape and type values to616 be reduced.617 red_op: binary elementwise reduction operator.618 upper_level_f: function for reducing one value per worker, across619 workers.620 Returns:621 list of T @{tf.Tensor} of reduced values.622 Raises:623 ValueError: inputs not well-formed.624 """625 input_tensors, shape = _flatten_tensors(input_tensors)626 devices = [t.device for t in input_tensors]627 per_worker_devices, per_worker_values = _split_by_task(devices, input_tensors)628 num_workers = len(per_worker_devices)629 up_values = [None for w in range(0, num_workers)]630 up_devices = up_values[:]631 down_values = up_values[:]632 # First stage: reduce within each worker using NCCL633 for w in range(0, num_workers):634 worker_values = build_nccl_all_reduce(per_worker_values[w], red_op)635 # NOTE: these reductions will not run to completion unless636 # every output value is used. Since we only need one, we637 # need to put control dependencies on the rest.638 with ops.control_dependencies(worker_values):639 with ops.device(worker_values[0].device):640 up_values[w] = array_ops.identity(worker_values[0])641 up_devices[w] = per_worker_devices[w][0]642 # Second stage: Apply upper_level_f to reduce across first device at643 # each worker644 level_2_output = upper_level_f(up_values)645 # Third stage: propagate within each worker using NCCL Broadcast646 for w in range(0, num_workers):647 dst_tensors = []648 with ops.device(per_worker_devices[w][0]):649 broadcast_src = nccl.broadcast(array_ops.identity(level_2_output[w]))650 for d in per_worker_devices[w]:651 with ops.device(d):652 dst_tensors.append(array_ops.identity(broadcast_src))653 down_values[w] = dst_tensors654 output_tensors = [v for sublist in down_values for v in sublist]655 if len(shape) != 1:656 output_tensors = _reshape_tensors(output_tensors, shape)657 return output_tensors658def _reduce_non_singleton(input_tensors, red_f, un_op):659 """If input_tensors has more than one element apply red_f, else apply un_op."""660 if len(input_tensors) > 1:661 return red_f(input_tensors)662 else:663 if not un_op:664 return input_tensors665 output_tensors = []666 for t in input_tensors:667 with ops.colocate_with(t):668 output_tensors.append(un_op(t))669 return output_tensors670def build_nccl_then_ring(input_tensors, subdiv, red_op, un_op=None):671 """Construct hybrid of NCCL within workers, Ring across workers."""672 def upper_builder(y):673 return build_ring_all_reduce(y, len(y), subdiv, [0], red_op, un_op)674 def upper_level_f(x):675 return _reduce_non_singleton(x, upper_builder, un_op)676 return _build_nccl_hybrid(input_tensors, red_op, upper_level_f)677def build_nccl_then_recursive_hd(input_tensors, red_op, un_op=None):678 """Construct hybrid of NCCL within workers, Recursive-HD across workers."""679 upper_level_f = lambda x: build_recursive_hd_all_reduce(x, red_op, un_op)680 return _build_nccl_hybrid(input_tensors, red_op, upper_level_f)681def build_nccl_then_shuffle(input_tensors, gather_devices, nccl_red_op,682 shuffle_red_op, un_op=None):683 """Construct hybrid of NCCL within workers, Shuffle across workers."""684 upper_level_f = lambda x: build_shuffle_all_reduce(x, gather_devices,685 shuffle_red_op, un_op)686 return _build_nccl_hybrid(input_tensors, nccl_red_op, upper_level_f)687def _build_shuffle_hybrid(input_tensors, gather_devices, red_op, upper_level_f):688 """Construct a subgraph for Shuffle hybrid all-reduce.689 Args:690 input_tensors: list of T @{tf.Tensor} of same-shape and type values to691 be reduced.692 gather_devices: list of device names on which to host gather shards.693 red_op: binary elementwise reduction operator.694 upper_level_f: function for reducing one value per worker, across695 workers.696 Returns:697 list of T @{tf.Tensor} of reduced values.698 Raises:699 ValueError: inputs not well-formed.700 """701 input_tensors, shape = _flatten_tensors(input_tensors)702 # First stage, reduce across each worker using gather_devices.703 devices = [t.device for t in input_tensors]704 per_worker_devices, per_worker_values = _split_by_task(devices, input_tensors)705 num_workers = len(per_worker_devices)706 up_values = []707 if len(gather_devices) != num_workers:708 raise ValueError("For shuffle hybrid, gather_devices must contain one "709 "device per worker. ")710 for w in range(0, num_workers):711 reduced_shards = _build_shuffle_gather(712 per_worker_values[w], [gather_devices[w]], red_op)713 up_values.append(reduced_shards[0])714 # Second stage, apply upper_level_f.715 level_2_output = upper_level_f(up_values)716 # Third stage, apply shuffle scatter at each worker.717 output_tensors = []718 for w in range(0, num_workers):719 output_tensors += _build_shuffle_scatter(720 [level_2_output[w]], per_worker_devices[w])721 if len(shape) != 1:722 output_tensors = _reshape_tensors(output_tensors, shape)723 return output_tensors724def build_shuffle_then_ring(input_tensors, gather_devices, subdiv,725 red_n_op, red_op, un_op=None):726 """Construct hybrid of Shuffle within workers, Ring across workers."""727 def upper_builder(tensors):728 return build_ring_all_reduce(tensors, len(tensors), subdiv, [0],729 red_op, un_op)730 def upper_level_f(tensors):731 return _reduce_non_singleton(tensors, upper_builder, un_op)732 return _build_shuffle_hybrid(733 input_tensors, gather_devices, red_n_op, upper_level_f)734def build_shuffle_then_shuffle(input_tensors, first_gather_devices,735 second_gather_devices, red_op, un_op=None):736 """Construct hybrid of Shuffle within workers, Shuffle across workers."""737 def upper_builder(tensors):738 return build_shuffle_all_reduce(tensors, second_gather_devices,739 red_op, un_op)740 def upper_level_f(tensors):741 return _reduce_non_singleton(tensors, upper_builder, un_op)742 return _build_shuffle_hybrid(...`

app.py

Source:app.py

`1import paho.mqtt.client as mqtt2from flask import Flask,g,render_template,request,Response3from flask_cors import CORS4from flask_socketio import SocketIO, emit5from flask_apscheduler import APScheduler6from libDB import logDB,clearOldDB,queryDB7import numpy as np8import json9from libPickle import *10from pyimagesearch.motion_detection import SingleMotionDetector11from imutils.video import VideoStream12import threading13import argparse14import datetime15import imutils16import schedule17import time18import cv219#edit by chrome20"""21Date storage22- hot : pickle devices data23- cold :24- frozen : sqlite325""" 26class Config(object):27 SCHEDULER_API_ENABLED = True28 SECRET_KEY = 'secret!'29 threaded=True30 31scheduler = APScheduler()32# initialize the output frame and a lock used to ensure thread-safe33# exchanges of the output frames (useful for multiple browsers/tabs34# are viewing tthe stream)35outputFrame = None36lock = threading.Lock()37app = Flask(__name__)38app.config.from_object(Config())39CORS(app)40socketio = SocketIO(app)41# initialize the video stream and allow the camera sensor to42# warmup43#vs = VideoStream(usePiCamera=1).start()44vs = VideoStream(src=0).start()45# Create a dictionary called devices to store the device number, name, and device state:46devices = {47 'one' : {'floor' : 'fl1','position' : 'garage','object' : 'door', 'cmd' : '01', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl1/garage/door/01', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},48 'two' : {'floor' : 'fl1','position' : 'garage','object' : 'door', 'cmd' : '02', 'state' : 'False', 'type' : 'latch', 'topic' : 'fl1/garage/door/02', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},49 'three' : {'floor' : 'fl1','position' : 'com','object' : 'light', 'cmd' : '01', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl1/com/light/01', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},50 'four' : {'floor' : 'fl1','position' : 'com','object' : 'light', 'cmd' : '02', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl1/com/light/02', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},51 'five' : {'floor' : 'fl2','position' : 'liv','object' : 'fan', 'cmd' : '01', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl2/liv/fan/01', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},52 'six' : {'floor' : 'fl2','position' : 'cen','object' : 'light', 'cmd' : '01', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl2/cen/light/01', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},53 'seven' : {'floor' : 'fl3','position' : 'bed','object' : 'fan', 'cmd' : '01', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl3/bed/fan/01', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'}54 }55try:56 devices = load_pickle_obj('devices')57 # print(devices)58 print("devices exist")59except:60 save_pickle_obj(devices,'devices')61sensors = {62 'one' : {'name' : 'sensorName1'}63 }64jobCron = { 'startHour':'14',65 'startMinute':'00',66 'stopHour':'14',67 'stopMinute':'30'68 }69try:70 jobCron = load_pickle_obj('jobCron')71 # print(jobCron)72 print("jobCron exist")73except:74 save_pickle_obj(jobCron,'jobCron')75queryDatas = {'temp':{},76 'timeStamp':{},77 'humid':{}}78# Put the device dictionary into the template data dictionary:79templateData = {80 'devices' : devices,81 'sensors' : sensors,82 'jobCron': jobCron,83 'queryDatas':queryDatas84 }85@scheduler.task('cron', id='do_job_on', hour=jobCron['startHour'], minute=jobCron['startMinute'])86def jobOn():87 print('Job on executed')88 for device in devices:89 if devices[device]['type'] == 'on/off':90 if devices[device]['time_enable'] == 'True':91 devices[device]['state'] = 'True'92 mqtt_client.publish(devices[device]['topic'],"on")93 print(device+' '+devices[device]['time_enable'])94 try:95 save_pickle_obj(devices,'devices')96 except:97 print("error")98@scheduler.task('cron', id='do_job_off', hour=jobCron['stopHour'], minute=jobCron['stopMinute'])99def jobOff():100 print('Job off executed')101 for device in devices:102 if devices[device]['type'] == 'on/off':103 if devices[device]['time_enable'] == 'True':104 devices[device]['state'] = 'False'105 mqtt_client.publish(devices[device]['topic'],"off")106 print(device+' '+devices[device]['time_enable'])107 try:108 save_pickle_obj(devices,'devices')109 except:110 print("error")111@scheduler.task('cron', id='do_job_DB', minute='*')112def jobDB():113 print('Job DB executed')114 for device in devices:115 if devices[device]['th_enable'] == 'True':116 logDB(devices[device]['topic'],devices[device]['temp'],devices[device]['humid'],"DHT","true")117 try:118 save_pickle_obj(devices,'devices')119 except:120 print("error")121 122@scheduler.task('cron', id='do_job_ClrDB', day='*')123def jobClrDB():124 print('Job ClrDB executed')125 clearOldDB()126scheduler.init_app(app)127scheduler.start()128# The callback for when the client receives a CONNACK response from the server.129def on_connect(client, userdata, flags, rc):130 print("Connected with result code "+str(rc))131 # Subscribing in on_connect() means that if we lose the connection and132 # reconnect then subscriptions will be renewed.133 client.subscribe("fl1/#")# + single-level wild card, # multi-level wild card, 134 client.subscribe("fl2/#")135 client.subscribe("fl3/#")136 client.subscribe("#")137 138def whx(ownval,owndicts):139 for x in owndicts:140 if (ownval) in owndicts[x].values():141 return x142 return 'none'143# The callback for when a PUBLISH message is received from the ESP8266.144def on_message(client, userdata, message):145 # socketio.emit('my variable')146 print("Received message '" + str(message.payload.decode('utf8')) + "' on topic '"+ message.topic + "' with QoS " + str(message.qos))147 if message.topic.startswith('fl'):148 topicMsgSplit = message.topic.split("/")149 slash_count = message.topic.count("/")150 if slash_count>=3:151 topicMsg = topicMsgSplit[0]+"/"+topicMsgSplit[1]+"/"+topicMsgSplit[2]+"/"+topicMsgSplit[3]152 indexName = whx(topicMsg,devices)153 devices_list = list(devices)154 n_index = devices_list.index(indexName)155 if "/temperature" in message.topic:156 devices[indexName]['th_enable'] = 'True'157 devices[indexName]['temp'] = str(message.payload.decode('utf8'))158 socketio.emit('temp' , {'data': devices[indexName]['temp'],'pos':n_index})159 #print("temperature update")160 if "/humidity" in message.topic:161 devices[indexName]['th_enable'] = 'True'162 devices[indexName]['humid'] = str(message.payload.decode('utf8'))163 socketio.emit('humid' , {'data': devices[indexName]['humid'],'pos':n_index})164 # print("humidity update") 165 if "/feedback" in message.topic:166 if "on" in str(message.payload.decode('utf8')):167 if devices[indexName]['state'] == 'False':168 devices[indexName]['state'] = 'True'169 socketio.emit('refresh' , {})170 # print("refresh")171 172 else:173 if devices[indexName]['state'] == 'True':174 devices[indexName]['state'] = 'False'175 socketio.emit('refresh' , {})176 # print("refresh")177 # print("feedback update")178 else:179 print("mqtt message.topic error")180 elif message.topic.startswith('tele'):181 #print("tasmota")182 topicMsgSplit = message.topic.split("/")#[0]:tele,[1]:name,[2]:classify183 #topicMsg = topicMsgSplit[0]+"/"+topicMsgSplit[1]+"/"+topicMsgSplit[2]184 if topicMsgSplit[2] == "SENSOR":185 #print(topicMsgSplit[1])186 topic_json=json.dumps(str(message.payload.decode('utf8')))187 print(topic_json["ENERGY"])188 189 else:190 print("unknown topic: "+message.topic)191 192 try:193 save_pickle_obj(devices,'devices')194 except:195 print("error")196 197mqtt_client = mqtt.Client()198mqtt_client.on_connect = on_connect199mqtt_client.on_message = on_message200mqtt_client.connect("127.0.0.1",1883,60)201# mqtt_client.connect("192.168.2.46",1883,60)202mqtt_client.loop_start()203@app.route("/",methods=['GET', 'POST'])204def main():205 if request.method == 'GET':206 queryDatas['temp'] = queryDB("temp")207 queryDatas['humid'] = queryDB("humid")208 queryDatas['timeStamp'] = queryDB("timeStamp")209 # print(queryDatas['timeStamp'])210 templateData = {211 'devices' : devices,212 'sensors' : sensors,213 'jobCron': jobCron,214 'queryDatas':queryDatas215 }216 try:217 save_pickle_obj(devices,'devices')218 except:219 print("error")220 # Pass the template data into the template main.html and return it to the user221 return render_template('main.html', async_mode=socketio.async_mode, **templateData)222 elif request.method == 'POST':223 return 'post method do nothing'224 else:225 return 'method error'226@app.route("/debug")227def debug():228 templateData = {229 'devices' : devices,230 'sensors' : sensors,231 'jobCron': jobCron,232 'queryDatas':queryDatas233 }234 # Pass the template data into the template main.html and return it to the user235 return render_template('debug.html', async_mode=socketio.async_mode, **templateData)236# The function below is executed when someone requests a URL with the device number and action in it:237@app.route("/<device>/<floor>/<position>/<object>/<cmd>/<ctrl>",methods = ['POST', 'GET'])238def action(device, floor, position, object, cmd, ctrl):239 # Convert the device from the URL into an integer:240 #function = function#int(function)241 # Get the device name for the device being changed:242 # deviceName = devices[function]['name']243 # If the action part of the URL is "1" execute the code indented below:244 if ctrl == "on":245 mqtt_client.publish(devices[device]['topic'],"on")246 #print('mp '+devices[device]['topic'])247 devices[device]['state'] = 'True'248 # if ctrl == "0" and object == 'door':249 if ctrl == "off":250 mqtt_client.publish(devices[device]['topic'],"off")251 #print('mp '+devices[device]['topic'])252 devices[device]['state'] = 'False'253 if ctrl == "toggle":254 if devices[device]['state'] == 'True':255 mqtt_client.publish(devices[device]['topic'],"off")256 #print('mp '+devices[device]['topic'])257 devices[device]['state'] = 'False'258 else: 259 mqtt_client.publish(devices[device]['topic'],"on")260 #print('mp '+devices[device]['topic'])261 devices[device]['state'] = 'True'262 if ctrl == "click":263 mqtt_client.publish(devices[device]['topic'],"click")264 print('click '+devices[device]['topic'])265 devices[device]['state'] = 'False'266 267 # Along with the device dictionary, put the message into the template data dictionary:268 templateData = {269 'devices' : devices,270 'sensors' : sensors,271 'jobCron': jobCron,272 'queryDatas':queryDatas273 }274 try:275 save_pickle_obj(devices,'devices')276 except:277 print("error")278 return render_template('main.html', **templateData)279@app.route("/addSched/<device>")280def addSched(device):281 print('time_enable '+devices[device]['time_enable'])282 devices[device]['time_enable'] = 'True'283 templateData = {284 'devices' : devices,285 'sensors' : sensors,286 'jobCron': jobCron,287 'queryDatas':queryDatas288 }289 try:290 save_pickle_obj(devices,'devices')291 except:292 print("error")293 return render_template('main.html', **templateData)294 295 296@app.route("/rmSched/<device>")297def rmSched(device):298 print('time_enable '+devices[device]['time_enable'])299 devices[device]['time_enable'] = 'False'300 templateData = {301 'devices' : devices,302 'sensors' : sensors,303 'jobCron': jobCron,304 'queryDatas':queryDatas305 }306 try:307 save_pickle_obj(devices,'devices')308 except:309 print("error")310 return render_template('main.html', **templateData)311@app.route('/startTime',methods = ['POST', 'GET'])312def startTime():313 if request.method == 'POST':314 print(request.form)315 result1 = str(request.form['startTime1'])316 result2 = str(request.form['startTime2'])317 for job in scheduler.get_jobs():318 print(job.id)319 320 try:321 scheduler.scheduler.reschedule_job('do_job_on', trigger='cron', hour=result1, minute=result2)322 jobCron['startHour']=result1323 jobCron['startMinute']=result2324 except:325 pass326 templateData = {327 'devices' : devices,328 'sensors' : sensors,329 'jobCron': jobCron,330 'queryDatas':queryDatas331 }332 try:333 save_pickle_obj(devices,'devices')334 save_pickle_obj(jobCron,'jobCron')335 except:336 print("error")337 return render_template('main.html', **templateData)338 339@app.route('/stopTime',methods = ['POST', 'GET'])340def stopTime():341 if request.method == 'POST':342 result1 = str(request.form['stopTime1'])343 result2 = str(request.form['stopTime2'])344 for job in scheduler.get_jobs():345 print(job.id)346 347 try:348 scheduler.scheduler.reschedule_job('do_job_off', trigger='cron', hour=result1, minute=result2)349 jobCron['stopHour']=result1350 jobCron['stopMinute']=result2351 except:352 pass353 354 templateData = {355 'devices' : devices,356 'sensors' : sensors,357 'jobCron': jobCron,358 'queryDatas':queryDatas359 }360 try:361 save_pickle_obj(devices,'devices')362 save_pickle_obj(jobCron,'jobCron')363 except:364 print("error")365 return render_template('main.html', **templateData)366@app.route("/videoStream")367def videoStream():368 # return the rendered template369 return render_template("videoStream.html")370def detect_motion(frameCount):371 # grab global references to the video stream, output frame, and372 # lock variables373 global vs, outputFrame, lock374 # initialize the motion detector and the total number of frames375 # read thus far376 md = SingleMotionDetector(accumWeight=0.1)377 total = 0378 # loop over frames from the video stream379 while True:380 # read the next frame from the video stream, resize it,381 # convert the frame to grayscale, and blur it382 frame = vs.read()383 frame = imutils.resize(frame, width=400)384 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)385 gray = cv2.GaussianBlur(gray, (7, 7), 0)386 # grab the current timestamp and draw it on the frame387 timestamp = datetime.datetime.now()388 cv2.putText(frame, timestamp.strftime("%A %d %B %Y %I:%M:%S%p"), 389 (10, frame.shape[0] - 10),cv2.FONT_HERSHEY_SIMPLEX, 0.35, (0, 0, 255), 1)390 # if the total number of frames has reached a sufficient391 # number to construct a reasonable background model, then392 # continue to process the frame393 if total > frameCount:394 # detect motion in the image395 motion = md.detect(gray)396 # cehck to see if motion was found in the frame397 if motion is not None:398 # unpack the tuple and draw the box surrounding the399 # "motion area" on the output frame400 (thresh, (minX, minY, maxX, maxY)) = motion401 cv2.rectangle(frame, (minX, minY), (maxX, maxY),(0, 0, 255), 2)402 403 # update the background model and increment the total number404 # of frames read thus far405 md.update(gray)406 total += 1407 # acquire the lock, set the output frame, and release the408 # lock409 with lock:410 outputFrame = frame.copy()411 # outputFrame = gray.copy()412 413 414def generate():415 # grab global references to the output frame and lock variables416 global outputFrame, lock417 # loop over frames from the output stream418 while True:419 # wait until the lock is acquired420 with lock:421 # check if the output frame is available, otherwise skip422 # the iteration of the loop423 if outputFrame is None:424 continue425 # encode the frame in JPEG format426 (flag, encodedImage) = cv2.imencode(".jpg", outputFrame)427 # ensure the frame was successfully encoded428 if not flag:429 continue430 # yield the output frame in the byte format431 yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + bytearray(encodedImage) + b'\r\n')432@app.route("/video_feed")433def video_feed():434 # return the response generated along with the specific media435 # type (mime type)436 return Response(generate(),mimetype = "multipart/x-mixed-replace; boundary=frame")437 438@socketio.on('my event')439def handle_my_custom_event(json):440 #print('received json data here: ' + str(json))441 pass442# function for responses443def results():444 # build a request object445 req = request.get_json(silent=True, force=True)446 print("Request:")447 print(json.dumps(req, indent=4))448 # print(req,flush=True)449 # fetch fulfillmentText from json450 fulfillmentText = req.get('queryResult').get('fulfillmentText')451 # print(req,flush=True)452 # print(fulfillmentText, flush=True)453 mqtt_client.publish("fl1/garage/door/02","slide")454 # return a fulfillment response455 #return {'fulfillmentText': 'This is a response from webhook.'}456 return 'results'457@app.route('/webhook',methods=['GET','POST'])458def webhook():459 if request.method == 'POST':460 return results()461 else:462 return 'Hello'463if __name__ == "__main__":464 t = threading.Thread(target=detect_motion, args=(64,))465 t.daemon = True466 t.start()467 socketio.run(app, host='0.0.0.0', port=80, debug=True, use_reloader=False)468 469# release the video stream pointer...`

path.py

Source:path.py

`...158 List of devices ordered by coordinates159 """160 return sorted(self.devices, key=lambda dev: dev.md.z)161 @property162 def blocking_devices(self):163 """164 A list of devices that are currently inserted or are in unknown165 positions. This includes devices downstream of the first166 :attr:`.impediment`167 """168 # Cache important prior devices169 prior = None170 last_branches = list()171 block = list()172 for device in self.path:173 # If we have switched beamlines174 if prior and device.md.beamline != prior.md.beamline:175 # Find improperly configured optics176 for optic in last_branches:177 # If this optic is responsible for delivering beam178 # to this hutch and it is not configured to do so.179 # Mark it as blocking180 if device.md.beamline in optic.branches:181 if device.md.beamline not in optic.destination:182 block.append(optic)183 # Otherwise ensure it is removed from the beamline184 elif optic.md.beamline not in optic.destination:185 block.append(optic)186 # Clear optics that have been evaluated187 last_branches.clear()188 # If our last device was an optic, make sure it wasn't required189 # to continue along this beampath190 elif (prior in last_branches191 and device.md.beamline in prior.branches192 and device.md.beamline not in prior.destination):193 block.append(last_branches.pop(-1))194 # Find branching devices and store195 # They will be marked as blocking by downstream devices196 dev_state = find_device_state(device)197 if device in self.branches:198 last_branches.append(device)199 # Find inserted devices200 elif dev_state == DeviceState.Inserted:201 # Ignore devices with low enough transmssion202 trans = getattr(device, 'transmission', 1)203 if trans < self.minimum_transmission:204 block.append(device)205 # Find unknown and faulted devices206 elif dev_state != DeviceState.Removed:207 block.append(device)208 # Stache our prior device209 prior = device210 return block211 @property212 def incident_devices(self):213 """214 A list of devices the beam is currently incident on. This includes the215 current :attr:`.impediment` and any upstream devices that may be216 inserted but have more transmission than :attr:`.minimum_transmission`217 """218 # Find device information219 inserted = [d for d in self.path220 if find_device_state(d) == DeviceState.Inserted]221 impediment = self.impediment222 # No blocking devices, all inserted devices incident223 if not impediment:224 return inserted225 # Otherwise only return upstream of the impediment226 return [d for d in inserted if d.md.z <= impediment.md.z]227 def show_devices(self, file=None):228 """229 Print a table of the devices along the beamline230 Parameters231 ----------232 file : file-like object233 File to writable234 """235 # Initialize Table236 pt = PrettyTable(['Name', 'Prefix', 'Position', 'Beamline', 'State'])237 # Adjust Table settings238 pt.align = 'r'239 pt.align['Name'] = 'l'240 pt.align['Prefix'] = 'l'241 pt.float_format = '8.5'...`

vscsi_util.py

Source:vscsi_util.py

`...136 devname = sg137 scsi_id = _vscsi_get_scsiid(sg)138 devices.append([hctl, devname, sg, scsi_id])139 return devices140def vscsi_get_scsidevices(mask="*"):141 """ get all scsi devices information """142 devices = _vscsi_get_scsidevices_by_lsscsi("[%s]" % mask)143 if devices or (len(mask) and mask[0] != "*"):144 # devices found or partial device scan145 return devices146 return _vscsi_get_scsidevices_by_sysfs()147def vscsi_get_hctl_and_devname_by(target, scsi_devices = None):148 if target.startswith('/dev/'):149 target = os.path.realpath(target)150 if scsi_devices is None:151 if len(target.split(':')) == 4:152 scsi_devices = _vscsi_get_scsidevices_by_lsscsi(target)153 elif target.startswith('/dev/'): 154 scsi_devices = _vscsi_get_scsidevices_by_lsscsi("| grep %s" % target)155 else:156 scsi_devices = _vscsi_get_scsidevices_by_lsscsi("")157 if not scsi_devices:158 scsi_devices = _vscsi_get_scsidevices_by_sysfs()159 if len(target.split(':')) == 4:160 return _vscsi_get_devname_by(target, scsi_devices)161 else:162 return _vscsi_get_hctl_by(target, scsi_devices)163def get_scsi_vendor(pHCTL):164 try:165 sysfs_mnt = utils.find_sysfs_mount() 166 sysfs_scsi_dev_path = \167 os.path.join(sysfs_mnt + SYSFS_SCSI_PATH, pHCTL)168 scsi_vendor = \169 os.popen('cat ' + sysfs_scsi_dev_path + \170 SYSFS_SCSI_DEV_VENDOR_PATH).read()171 return scsi_vendor.splitlines()[0]172 except:173 return None174def get_scsi_model(pHCTL):175 try:176 sysfs_mnt = utils.find_sysfs_mount() 177 sysfs_scsi_dev_path = \178 os.path.join(sysfs_mnt + SYSFS_SCSI_PATH, pHCTL)179 scsi_model = \180 os.popen('cat ' + sysfs_scsi_dev_path + \181 SYSFS_SCSI_DEV_MODEL_PATH).read()182 return scsi_model.splitlines()[0]183 except:184 return None185def get_scsi_typeid(pHCTL):186 try:187 sysfs_mnt = utils.find_sysfs_mount() 188 sysfs_scsi_dev_path = \189 os.path.join(sysfs_mnt + SYSFS_SCSI_PATH, pHCTL)190 scsi_typeid = \191 os.popen('cat ' + sysfs_scsi_dev_path + \192 SYSFS_SCSI_DEV_TYPEID_PATH).read()193 return int(scsi_typeid.splitlines()[0])194 except:195 return None196def get_scsi_revision(pHCTL):197 try:198 sysfs_mnt = utils.find_sysfs_mount() 199 sysfs_scsi_dev_path = \200 os.path.join(sysfs_mnt + SYSFS_SCSI_PATH, pHCTL)201 scsi_revision = \202 os.popen('cat ' + sysfs_scsi_dev_path + \203 SYSFS_SCSI_DEV_REVISION_PATH).read()204 return scsi_revision.splitlines()[0]205 except:206 return None207def get_scsi_scsilevel(pHCTL):208 try:209 sysfs_mnt = utils.find_sysfs_mount() 210 sysfs_scsi_dev_path = \211 os.path.join(sysfs_mnt + SYSFS_SCSI_PATH, pHCTL)212 scsi_scsilevel = \213 os.popen('cat ' + sysfs_scsi_dev_path + \214 SYSFS_SCSI_DEV_SCSILEVEL_PATH).read()215 return int(scsi_scsilevel.splitlines()[0])216 except:217 return None218def _make_scsi_record(scsi_info):219 scsi_rec = {220 'physical_HCTL': scsi_info[0],221 'dev_name': None,222 'sg_name': scsi_info[2],223 'scsi_id': None224 }225 if scsi_info[1] is not None:226 scsi_rec['dev_name'] = scsi_info[1] 227 if scsi_info[3] is not None:228 scsi_rec['scsi_id'] = scsi_info[3] 229 scsi_rec['vendor_name'] = \230 get_scsi_vendor(scsi_rec['physical_HCTL'])231 scsi_rec['model'] = \232 get_scsi_model(scsi_rec['physical_HCTL'])233 scsi_rec['type_id'] = \234 get_scsi_typeid(scsi_rec['physical_HCTL'])235 scsi_rec['revision'] = \236 get_scsi_revision(scsi_rec['physical_HCTL'])237 scsi_rec['scsi_level'] = \238 get_scsi_scsilevel(scsi_rec['physical_HCTL'])239 try:240 lsscsi_info = os.popen('lsscsi %s 2>/dev/null' % scsi_rec['physical_HCTL']).read().split()241 scsi_rec['type'] = lsscsi_info[1]242 except:243 scsi_rec['type'] = None244 return scsi_rec245def get_scsi_device(pHCTL):246 scsis_info = _vscsi_get_scsidevices_by_lsscsi(pHCTL)247 if not scsis_info:248 scsis_info = _vscsi_get_scsidevices_by_sysfs()249 for scsi_info in scsis_info:250 if scsi_info[0] == pHCTL:251 return _make_scsi_record(scsi_info)252 return None253def get_all_scsi_devices(mask="*"):254 scsi_records = []255 for scsi_info in vscsi_get_scsidevices(mask):256 scsi_record = _make_scsi_record(scsi_info)257 scsi_records.append(scsi_record)...`

## Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.