Best Python code snippet using ATX
all_reduce.py
Source:all_reduce.py  
1# Copyright 2017 The TensorFlow Authors. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7#     http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14# ==============================================================================15"""Utilities to construct a TF subgraph implementing distributed All-Reduce."""16from __future__ import absolute_import17from __future__ import division18from __future__ import print_function19import collections20import math21from tensorflow.contrib import nccl22from tensorflow.python.framework import device as device_lib23from tensorflow.python.framework import ops24from tensorflow.python.ops import array_ops25from tensorflow.python.ops import math_ops26def _flatten_tensors(tensors):27  """Check tensors for isomorphism and flatten.28  Args:29    tensors: list of T @{tf.Tensor} which must all have the same shape.30  Returns:31    tensors: a list of T @{tf.Tensor} which are flattened (1D) views of tensors32    shape: the original shape of each element of input tensors33  Raises:34    ValueError: tensors are empty or non-isomorphic or have unknown shape.35  """36  if not tensors:37    raise ValueError("tensors cannot be empty")38  shape = tensors[0].shape39  for tensor in tensors:40    shape = shape.merge_with(tensor.shape)41  if not shape.is_fully_defined():42    raise ValueError("Tensors must have statically known shape.")43  if len(shape) != 1:44    reshaped = []45    for t in tensors:46      with ops.colocate_with(t):47        reshaped.append(array_ops.reshape(t, [-1]))48    tensors = reshaped49  return tensors, shape50def _reshape_tensors(tensors, shape):51  """Reshape tensors flattened by _flatten_tensors.52  Args:53    tensors: list of T @{tf.Tensor} of identical length 1D tensors.54    shape: list of integers describing the desired shape.  Product of55      the elements must equal the length of each tensor.56  Returns:57    list of T @{tf.Tensor} which are the reshaped inputs.58  """59  reshaped = []60  for t in tensors:61    with ops.colocate_with(t):62      reshaped.append(array_ops.reshape(t, shape))63  return reshaped64def _padded_split(tensor, pieces):65  """Like split for 1D tensors but pads-out case where len % pieces != 0.66  Args:67    tensor: T @{tf.Tensor} that must be 1D.68    pieces: a positive integer specifying the number of pieces into which69      tensor should be split.70  Returns:71    list of T @{tf.Tensor} of length pieces, which hold the values of72      thin input tensor, in order.  The final tensor may73      be zero-padded on the end to make its size equal to those of all74      of the other tensors.75  Raises:76    ValueError: The input tensor is not 1D.77  """78  shape = tensor.shape79  if 1 != len(shape):80    raise ValueError("input tensor must be 1D")81  tensor_len = shape[0].value82  with ops.colocate_with(tensor):83    if tensor_len % pieces != 0:84      # pad to an even length85      chunk_size = 1 + tensor_len // pieces86      if pieces > tensor_len:87        # This is an edge case that should not come up in practice,88        # i.e. a different reduction algorithm would be better,89        # but we'll make it work just for completeness.90        pad_len = pieces - tensor_len91        extended_whole = array_ops.concat(92            [tensor, array_ops.zeros([pad_len], dtype=tensor.dtype)], 0)93        parts = array_ops.split(extended_whole, pieces)94        return parts, pad_len95      elif (pieces - 1) * chunk_size >= tensor_len:96        # Another edge case of limited real interest.97        pad_len = (pieces * chunk_size) % tensor_len98        extended_whole = array_ops.concat(99            [tensor, array_ops.zeros([pad_len], dtype=tensor.dtype)], 0)100        parts = array_ops.split(extended_whole, pieces)101        return parts, pad_len102      else:103        last_chunk_size = tensor_len - (pieces - 1) * chunk_size104        pad_len = chunk_size - last_chunk_size105        piece_lens = [chunk_size for _ in range(pieces - 1)] + [last_chunk_size]106        parts = array_ops.split(tensor, piece_lens)107        parts[-1] = array_ops.concat(108            [parts[-1], array_ops.zeros([pad_len], dtype=tensor.dtype)], 0)109        return parts, pad_len110    else:111      return array_ops.split(tensor, pieces), 0112def _strip_padding(tensors, pad_len):113  """Strip the suffix padding added by _padded_split.114  Args:115    tensors: list of T @{tf.Tensor} of identical length 1D tensors.116    pad_len: number of elements to be stripped from the end of each tensor.117  Returns:118    list of T @{tf.Tensor} which are the stripped inputs.119  Raises:120    ValueError: tensors must be a non-empty list of 1D tensors, and121      each must be longer than pad_len.122  """123  if not tensors:124    raise ValueError("tensors cannot be empty")125  shape = tensors[0].shape126  if len(shape) > 1:127    raise ValueError("tensors must be 1D")128  prefix_len = int(shape[0] - pad_len)129  if prefix_len < 0:130    raise ValueError("pad_len longer than tensor")131  stripped = []132  for t in tensors:133    with ops.colocate_with(t):134      stripped.append(array_ops.slice(t, [0], [prefix_len]))135  return stripped136def _ragged_split(tensor, pieces):137  """Like split for 1D tensors but allows case where len % pieces != 0.138  Args:139    tensor: T @{tf.Tensor} that must be 1D.140    pieces: a positive integer specifying the number of pieces into which141      tensor should be split.142  Returns:143    list of T @{tf.Tensor} of length pieces, which hold the values of144      the input tensor, in order.  The final tensor may be shorter145      than the others, which will all be of equal length.146  Raises:147    ValueError: input tensor must be 1D.148  """149  shape = tensor.shape150  if 1 != len(shape):151    raise ValueError("input tensor must be 1D")152  tensor_len = shape[0].value153  chunk_size = tensor_len // pieces154  with ops.colocate_with(tensor):155    if tensor_len != (pieces * chunk_size):156      # last piece will be short157      assert pieces > 1158      last_chunk_size = tensor_len - ((pieces - 1) * chunk_size)159      assert last_chunk_size > 0160      piece_lens = [chunk_size for _ in range(pieces - 1)] + [last_chunk_size]161      return array_ops.split(tensor, piece_lens)162    else:163      return array_ops.split(tensor, pieces)164def _ring_permutations(num_workers, num_subchunks, gpu_perm):165  """"Generate an array of device index arrays, one for each subchunk.166  In the basic ring reduction algorithm there are size(T)/num_devices167  data chunks and each device process one chunk per tick, i.e. sending168  one chunk and receiving one chunk.  The idea of subchunking is that169  each device processes num_subchunks smaller data regions per tick,170  and the ring rank permutation is different for each subchunk index171  so that a device is potentially sending to and receiving from172  num_subchunks different other devices at each tick.  Where multiple173  independent data channels exist between devices, this strategy174  supplies a method of using them in parallel.175  Args:176    num_workers: number of worker tasks177    num_subchunks: number of subchunks into which to divide each per-GPU chunk.178    gpu_perm: an array of integers in [0, num_gpus-1] giving the default179      ring order of GPUs at each worker.  Other permutations will be generated180      by rotating this array and splicing together per-worker instances.181  Raises:182    ValueError: the number of subchunks may not exceed the number of GPUs.183  Returns:184    pred_by_s_d: list of lists that maps (by index) from (subchunk, dev) to185        preceding device in the permutation for that subchunk.  The186        device index of GPU i at worker j is i + (j * num_gpus).187    rank_by_s_d: list of lists that maps (by index) from (subchunk, dev) to188       local rank of device d in the permutation for that subchunk.189  """190  num_gpus = len(gpu_perm)191  devices = num_workers * num_gpus192  if devices == 0:193    return [], []194  if num_subchunks > num_gpus:195    raise ValueError(196        "num_subchunks %d must be <= num_gpus %d" % (num_subchunks, num_gpus))197  rotation_interval = max(1, int(num_gpus / num_subchunks))198  perms_by_s = []199  for s in range(0, num_subchunks):200    full_order = []201    offset = s * rotation_interval202    for w in range(0, num_workers):203      default_order = [(w * num_gpus) + i for i in gpu_perm]204      dev_order = default_order[offset:] + default_order[:offset]205      full_order += dev_order206    perms_by_s.append(full_order)207  pred_by_s_d = [[-1 for d in range(0, devices)]208                 for s in range(0, num_subchunks)]209  rank_by_s_d = [[-1 for d in range(0, devices)]210                 for s in range(0, num_subchunks)]211  for s in range(0, num_subchunks):212    for d in range(0, devices):213      for t in range(0, devices):214        if d == perms_by_s[s][t]:215          rank_by_s_d[s][d] = t216          pred_by_s_d[s][d] = perms_by_s[s][(t + devices - 1) % devices]217          break218  return (pred_by_s_d, rank_by_s_d)219def build_ring_all_reduce(input_tensors, num_workers, num_subchunks,220                          gpu_perm, red_op, un_op=None):221  """Construct a subgraph performing a ring-style all-reduce of input_tensors.222  Args:223    input_tensors: a list of T @{tf.Tensor} objects, which must all224      have the same shape and type.225    num_workers: number of worker tasks spanned by input_tensors.226    num_subchunks: number of subchunks each device should process in one tick.227    gpu_perm: a list of ints giving a ring-wise rank ordering of GPUs at228      each worker.  All workers must have the same number of229      GPUs with the same rank ordering.  If NVLINK is available, this should230      be a ring order supported by NVLINK edges.231    red_op: a binary operator for elementwise reduction.232    un_op: an optional unary operator to apply to fully reduced values.233  Raises:234    ValueError: empty input_tensors or they don't all have same235    size.236  Returns:237    a list of T @{tf.Tensor} identical sum-reductions of input_tensors.238  """239  if len(input_tensors) < 2:240    raise ValueError("input_tensors must be length 2 or longer")241  input_tensors, shape = _flatten_tensors(input_tensors)242  devices = [t.device for t in input_tensors]243  (pred_by_s_d, rank_by_s_d) = _ring_permutations(244      num_workers, num_subchunks, gpu_perm)245  chunks_by_dev, pad_len = _build_ring_gather(246      input_tensors, devices,247      num_subchunks, pred_by_s_d, rank_by_s_d, red_op)248  if un_op:249    chunks_by_dev = _apply_unary_to_chunks(un_op, chunks_by_dev)250  output_tensors = _build_ring_scatter(pred_by_s_d, rank_by_s_d,251                                       chunks_by_dev)252  if pad_len > 0:253    output_tensors = _strip_padding(output_tensors, pad_len)254  if len(shape) != 1:255    output_tensors = _reshape_tensors(output_tensors, shape)256  return output_tensors257def _build_ring_gather(input_tensors, devices, num_subchunks,258                       pred_by_s_d, rank_by_s_d, red_op):259  """Construct a subgraph for the first (reduction) pass of ring all-reduce.260  Args:261    input_tensors: a list of T @{tf.Tensor} 1D input tensors of same262      shape and type.263    devices: array of device name strings264    num_subchunks: number of subchunks each device should process in one tick.265    pred_by_s_d: as produced by _ring_permutations266    rank_by_s_d: as produced by _ring_permutations267    red_op: a binary operator for elementwise reduction268  Raises:269    ValueError: tensors must all be one dimensional.270  Returns:271    list of list of T @{tf.Tensor} of (partially) reduced values where272    exactly num_subchunks chunks at each device are fully reduced.273  """274  num_devices = len(input_tensors)275  if num_devices == 0:276    return []277  if num_devices == 1:278    return input_tensors279  shape = input_tensors[0].shape280  if 1 != len(shape):281    raise ValueError("input tensors must be 1D")282  num_chunks = num_devices * num_subchunks283  num_ticks = num_devices - 1284  # Initialize chunks_by_dev with splits of the input tensors.285  chunks_by_dev = []286  split_pad_len = 0287  for d in range(0, num_devices):288    with ops.device(devices[d]):289      splits, split_pad_len = _padded_split(input_tensors[d], num_chunks)290      chunks_by_dev.append(splits)291  # Reduction phase292  for tick in range(0, num_ticks):293    # One new partial reduction for every chunk294    new_partial_reductions = [None for _ in range(0, num_chunks)]295    # Compute reductions with respect to last tick's values296    for d in range(0, num_devices):297      with ops.device(devices[d]):298        for s in range(0, num_subchunks):299          rank = rank_by_s_d[s][d]300          seg_index = (rank + num_devices - (2 + tick)) % num_devices301          pred_dev = pred_by_s_d[s][d]302          chunk_index = (seg_index * num_subchunks) + s303          new_partial_reductions[chunk_index] = red_op(304              chunks_by_dev[pred_dev][chunk_index],305              chunks_by_dev[d][chunk_index])306    # Update chunks_by_dev with the new values at the end of the tick.307    for d in range(0, num_devices):308      for s in range(0, num_subchunks):309        rank = rank_by_s_d[s][d]310        seg_index = (rank + num_devices - (2 + tick)) % num_devices311        chunk_index = (seg_index * num_subchunks) + s312        chunks_by_dev[d][chunk_index] = new_partial_reductions[chunk_index]313  return chunks_by_dev, split_pad_len314def _apply_unary_to_chunks(f, chunks_by_dev):315  """Apply a unary op to each tensor in chunks_by_dev, on same device.316  Args:317    f: a unary function over T @{tf.Tensor}.318    chunks_by_dev: list of lists of T @{tf.Tensor}.319  Returns:320    new list of lists of T @{tf.Tensor} with the same structure as321    chunks_by_dev containing the derived tensors.322  """323  output = []324  for x in chunks_by_dev:325    with ops.colocate_with(x[0]):326      output.append([f(t) for t in x])327  return output328def _build_ring_scatter(pred_by_s_d, rank_by_s_d,329                        chunks_by_dev):330  """Construct subgraph for second (scatter) pass of ring all-reduce.331  Args:332    pred_by_s_d: as produced by _ring_permutations333    rank_by_s_d: as produced by _ring_permutations334    chunks_by_dev: list of list of T @{tf.Tensor} indexed by ints335      (device, chunk)336  Raises:337    ValueError: chunks_by_dev is not well-formed338  Returns:339    list of T @{tf.Tensor} which are the fully reduced tensors, one340    at each device corresponding to the outer dimension of chunks_by_dev.341  """342  num_devices = len(chunks_by_dev)343  num_chunks = len(chunks_by_dev[0])344  if 0 != num_chunks % num_devices:345    raise ValueError(346        "Expect number of chunks per device to be divisible by num_devices")347  num_subchunks = int(num_chunks / num_devices)348  num_ticks = num_devices - 1349  for tick in range(0, num_ticks):350    passed_values = [None for _ in range(0, num_chunks)]351    for d in range(0, num_devices):352      with ops.colocate_with(chunks_by_dev[d][0]):353        for s in range(0, num_subchunks):354          rank = rank_by_s_d[s][d]355          seg_index = (rank + num_devices - (1 + tick)) % num_devices356          pred_dev = pred_by_s_d[s][d]357          chunk_index = (seg_index * num_subchunks) + s358          passed_values[chunk_index] = array_ops.identity(359              chunks_by_dev[pred_dev][chunk_index])360    for d in range(0, num_devices):361      for s in range(0, num_subchunks):362        rank = rank_by_s_d[s][d]363        seg_index = (rank + num_devices - (1 + tick)) % num_devices364        chunk_index = (seg_index * num_subchunks) + s365        chunks_by_dev[d][chunk_index] = passed_values[chunk_index]366  # Join chunks at each device.367  output = []368  for x in chunks_by_dev:369    with ops.colocate_with(x[0]):370      output.append(array_ops.concat(x, 0))371  return output372def build_recursive_hd_all_reduce(input_tensors, red_op, un_op=None):373  """Construct a subgraph for recursive halving-doubling all-reduce.374  The recursive halving-doubling algorithm is described in375  http://www.mcs.anl.gov/~thakur/papers/ijhpca-coll.pdf376  The concept is to arrange the participating n devices in377  a linear sequence where devices exchange data pairwise378  with one other device in each round.  During the gather379  phase there are lg(n) rounds where devices exchange380  increasingly smaller sub-tensors with another device381  at increasingly greater distances, until at the top382  each device has 1/n of the fully reduced values.  During the383  scatter phase each device exchanges its fully reduced384  sub-tensor (which doubles in length at each round)385  with one other device at increasingly smaller distances386  until each device has all of the fully reduced values.387  Note: this preliminary version requires that len(input_tensors) be a388    power of 2.  TODO(tucker): relax this restriction.  Also, the389    number of elements in each tensor must be divisible by 2^h where h390    is the number of hops in each phase.  This will also be relaxed in391    the future with edge-case specific logic.392  Args:393    input_tensors: list of T @{tf.Tensor} to be elementwise reduced.394    red_op: a binary elementwise reduction Op.395    un_op: an optional unary elementwise Op to apply to reduced values.396  Returns:397    list of T @{tf.Tensor} which are the fully reduced tensors, one398    at each device of input_tensors.399  Raises:400    ValueError: num_devices not a power of 2, or tensor len not divisible401    by 2 the proper number of times.402  """403  devices = [t.device for t in input_tensors]404  input_tensors, shape = _flatten_tensors(input_tensors)405  reduced_shards = _build_recursive_hd_gather(input_tensors, devices, red_op)406  if un_op:407    reduced_shards = [un_op(t) for t in reduced_shards]408  output_tensors = _build_recursive_hd_scatter(reduced_shards, devices)409  if len(shape) != 1:410    output_tensors = _reshape_tensors(output_tensors, shape)411  return output_tensors412def _build_recursive_hd_gather(input_tensors, devices, red_op):413  """Construct the gather phase of recursive halving-doubling all-reduce.414  Args:415    input_tensors: list of T @{tf.Tensor} to be elementwise reduced.416    devices: a list of strings naming the devices hosting input_tensors,417      which will also be used to host the (partial) reduction values.418    red_op: a binary elementwise reduction Op.419  Returns:420    list of T @{tf.Tensor} which are the fully reduced tensor shards.421  Raises:422    ValueError: num_devices not a power of 2, or tensor len not divisible423    by 2 the proper number of times.424  """425  num_devices = len(devices)426  num_hops = int(math.log(num_devices, 2))427  if num_devices != (2 ** num_hops):428    raise ValueError("num_devices must be a power of 2")429  chunks = input_tensors430  for h in range(0, num_hops):431    span = 2 ** h432    group_size = span * 2433    new_chunks = [[] for _ in devices]434    for d in range(0, num_devices):435      if (d % group_size) >= (group_size / 2):436        # skip right half of a pair437        continue438      left_dev = devices[d]439      right_dev = devices[d + span]440      left_split = array_ops.split(chunks[d], 2)441      right_split = array_ops.split(chunks[d+span], 2)442      with ops.device(left_dev):443        new_chunks[d] = red_op(left_split[0], right_split[0])444      with ops.device(right_dev):445        new_chunks[d + span] = red_op(left_split[1], right_split[1])446    chunks = new_chunks447  return chunks448def _build_recursive_hd_scatter(input_tensors, devices):449  """Construct the scatter phase of recursive halving-doublng all-reduce.450  Args:451    input_tensors: list of T @{tf.Tensor} that are fully-reduced shards.452    devices: a list of strings naming the devices on which the reconstituted453      full tensors should be placed.454  Returns:455    list of T @{tf.Tensor} which are the fully reduced tensors.456  """457  num_devices = len(devices)458  num_hops = int(math.log(num_devices, 2))459  assert num_devices == (2 ** num_hops), "num_devices must be a power of 2"460  chunks = input_tensors461  for h in reversed(range(0, num_hops)):462    span = 2 ** h463    group_size = span * 2464    new_chunks = [[] for _ in devices]465    for d in range(0, num_devices):466      if (d % group_size) >= (group_size / 2):467        # skip right half of a pair468        continue469      left_idx = d470      right_idx = d + span471      left_dev = devices[left_idx]472      right_dev = devices[right_idx]473      with ops.device(left_dev):474        new_chunks[left_idx] = array_ops.concat([chunks[left_idx],475                                                 chunks[right_idx]], 0)476      with ops.device(right_dev):477        new_chunks[right_idx] = array_ops.concat([chunks[left_idx],478                                                  chunks[right_idx]], 0)479    chunks = new_chunks480  return chunks481def build_shuffle_all_reduce(input_tensors, gather_devices, red_op, un_op=None):482  """Construct a subgraph for shuffle all-reduce.483  Shuffle reduce is essentially the algorithm implemented when using484  parameter servers.  Suppose tensor length is n, there are d devices485  and g gather shards.  Each device sends a n/g length sub-tensor to486  each gather shard.  The gather shards perform a reduction across d487  fragments, then broadcast the result back to each device.  The488  devices then join the g fully reduced fragments they receive from489  the shards.  The gather shards could perform d-1 pairwise490  reductions, or one d-way reduction.  The first is better where491  reduction Op time is low compared to transmission time, the second492  better in the other case.493  Args:494    input_tensors: list of T @(tf.Tensor} values to be reduced.495    gather_devices: list of names of devices on which reduction shards496      should be placed.497    red_op: an n-array elementwise reduction Op498    un_op: optional elementwise unary Op to be applied to fully-reduced values.499  Returns:500    list of T @{tf.Tensor} which are the fully reduced tensors.501  """502  input_tensors, shape = _flatten_tensors(input_tensors)503  dst_devices = [t.device for t in input_tensors]504  reduced_shards = _build_shuffle_gather(input_tensors, gather_devices,505                                         red_op, un_op)506  output_tensors = _build_shuffle_scatter(reduced_shards, dst_devices)507  if len(shape) != 1:508    output_tensors = _reshape_tensors(output_tensors, shape)509  return output_tensors510def _build_shuffle_gather(input_tensors, gather_devices, red_op, un_op=None):511  """Construct the gather (concentrate and reduce) phase of shuffle all-reduce.512  Args:513    input_tensors: list of T @(tf.Tensor} values to be reduced.514    gather_devices: list of names of devices on which reduction shards515      should be placed.516    red_op: the binary reduction Op517    un_op: optional elementwise unary Op to be applied to fully-reduced values.518  Returns:519    list of T @{tf.Tensor} which are the fully reduced shards.520  Raises:521    ValueError: inputs not well-formed.522  """523  num_source_devices = len(input_tensors)524  num_gather_devices = len(gather_devices)525  shape = input_tensors[0].shape526  if len(shape) != 1:527    raise ValueError("input_tensors must be 1D")528  shards_by_source = []529  for d in range(0, num_source_devices):530    with ops.colocate_with(input_tensors[d]):531      shards_by_source.append(532          _ragged_split(input_tensors[d], num_gather_devices))533  reduced_shards = []534  for d in range(0, num_gather_devices):535    with ops.device(gather_devices[d]):536      values = [s[d] for s in shards_by_source]537      red_shard = red_op(values)538      if un_op:539        red_shard = un_op(red_shard)540      reduced_shards.append(red_shard)541  return reduced_shards542def _build_shuffle_scatter(reduced_shards, dst_devices):543  """Build the scatter phase of shuffle all-reduce.544  Args:545    reduced_shards:  list of T @(tf.Tensor} fully reduced shards546    dst_devices: list of names of devices at which the fully-reduced value547      should be reconstituted.548  Returns:549    list of T @{tf.Tensor} scattered tensors.550  """551  num_devices = len(dst_devices)552  out_tensors = []553  for d in range(0, num_devices):554    with ops.device(dst_devices[d]):555      out_tensors.append(array_ops.concat(reduced_shards, 0))556  return out_tensors557def _split_by_task(devices, values):558  """Partition devices and values by common task.559  Args:560    devices: list of device name strings561    values: list of T @{tf.tensor} of same length as devices.562  Returns:563    (per_task_devices, per_task_values) where both values are564    lists of lists with isomorphic structure: the outer list is565    indexed by task, and the inner list has length of the number566    of values belonging to that task.  per_task_devices contains567    the specific devices to which the values are local, and568    per_task_values contains the corresponding values.569  Raises:570    ValueError: devices must be same length as values.571  """572  num_devices = len(devices)573  if num_devices != len(values):574    raise ValueError("len(devices) must equal len(values)")575  per_task_devices = collections.OrderedDict()576  per_task_values = collections.OrderedDict()577  for d in range(num_devices):578    d_spec = device_lib.DeviceSpec.from_string(devices[d])579    if not hasattr(d_spec, "task") or d_spec.task is None:580      assert False, "failed to parse device %s" % devices[d]581    index = (d_spec.job or "localhost", d_spec.replica or 0, d_spec.task)582    if index not in per_task_devices:583      per_task_devices[index] = []584      per_task_values[index] = []585    per_task_devices[index].append(devices[d])586    per_task_values[index].append(values[d])587  return (list(per_task_devices.values()), list(per_task_values.values()))588def build_nccl_all_reduce(input_tensors, red_op, un_op=None):589  """Build a subgraph that does one full all-reduce, using NCCL.590  Args:591    input_tensors: list of T @{tf.Tensor} of same-shape and type values to592      be reduced.593    red_op: binary elementwise reduction operator.  Must be one of594      {tf.add}595    un_op: optional unary elementwise Op to apply to fully-reduce values.596  Returns:597    list of T @{tf.Tensor} of reduced values.598  Raises:599    ValueError: red_op not supported.600  """601  if red_op == math_ops.add:602    output_tensors = nccl.all_sum(input_tensors)603  else:604    raise ValueError("red_op not supported by NCCL all-reduce: ", red_op)605  if un_op:606    un_op_wrapped = []607    for t in output_tensors:608      with ops.colocate_with(t):609        un_op_wrapped.append(un_op(t))610    output_tensors = un_op_wrapped611  return output_tensors612def _build_nccl_hybrid(input_tensors, red_op, upper_level_f):613  """Construct a subgraph for NCCL hybrid all-reduce.614  Args:615    input_tensors: list of T @{tf.Tensor} of same-shape and type values to616      be reduced.617    red_op: binary elementwise reduction operator.618    upper_level_f: function for reducing one value per worker, across619      workers.620  Returns:621    list of T @{tf.Tensor} of reduced values.622  Raises:623    ValueError: inputs not well-formed.624  """625  input_tensors, shape = _flatten_tensors(input_tensors)626  devices = [t.device for t in input_tensors]627  per_worker_devices, per_worker_values = _split_by_task(devices, input_tensors)628  num_workers = len(per_worker_devices)629  up_values = [None for w in range(0, num_workers)]630  up_devices = up_values[:]631  down_values = up_values[:]632  # First stage: reduce within each worker using NCCL633  for w in range(0, num_workers):634    worker_values = build_nccl_all_reduce(per_worker_values[w], red_op)635    # NOTE: these reductions will not run to completion unless636    # every output value is used.  Since we only need one, we637    # need to put control dependencies on the rest.638    with ops.control_dependencies(worker_values):639      with ops.device(worker_values[0].device):640        up_values[w] = array_ops.identity(worker_values[0])641      up_devices[w] = per_worker_devices[w][0]642  # Second stage: Apply upper_level_f to reduce across first device at643  # each worker644  level_2_output = upper_level_f(up_values)645  # Third stage: propagate within each worker using NCCL Broadcast646  for w in range(0, num_workers):647    dst_tensors = []648    with ops.device(per_worker_devices[w][0]):649      broadcast_src = nccl.broadcast(array_ops.identity(level_2_output[w]))650    for d in per_worker_devices[w]:651      with ops.device(d):652        dst_tensors.append(array_ops.identity(broadcast_src))653    down_values[w] = dst_tensors654  output_tensors = [v for sublist in down_values for v in sublist]655  if len(shape) != 1:656    output_tensors = _reshape_tensors(output_tensors, shape)657  return output_tensors658def _reduce_non_singleton(input_tensors, red_f, un_op):659  """If input_tensors has more than one element apply red_f, else apply un_op."""660  if len(input_tensors) > 1:661    return red_f(input_tensors)662  else:663    if not un_op:664      return input_tensors665    output_tensors = []666    for t in input_tensors:667      with ops.colocate_with(t):668        output_tensors.append(un_op(t))669    return output_tensors670def build_nccl_then_ring(input_tensors, subdiv, red_op, un_op=None):671  """Construct hybrid of NCCL within workers, Ring across workers."""672  def upper_builder(y):673    return build_ring_all_reduce(y, len(y), subdiv, [0], red_op, un_op)674  def upper_level_f(x):675    return _reduce_non_singleton(x, upper_builder, un_op)676  return _build_nccl_hybrid(input_tensors, red_op, upper_level_f)677def build_nccl_then_recursive_hd(input_tensors, red_op, un_op=None):678  """Construct hybrid of NCCL within workers, Recursive-HD across workers."""679  upper_level_f = lambda x: build_recursive_hd_all_reduce(x, red_op, un_op)680  return _build_nccl_hybrid(input_tensors, red_op, upper_level_f)681def build_nccl_then_shuffle(input_tensors, gather_devices, nccl_red_op,682                            shuffle_red_op, un_op=None):683  """Construct hybrid of NCCL within workers, Shuffle across workers."""684  upper_level_f = lambda x: build_shuffle_all_reduce(x, gather_devices,685                                                     shuffle_red_op, un_op)686  return _build_nccl_hybrid(input_tensors, nccl_red_op, upper_level_f)687def _build_shuffle_hybrid(input_tensors, gather_devices, red_op, upper_level_f):688  """Construct a subgraph for Shuffle hybrid all-reduce.689  Args:690    input_tensors: list of T @{tf.Tensor} of same-shape and type values to691      be reduced.692    gather_devices: list of device names on which to host gather shards.693    red_op: binary elementwise reduction operator.694    upper_level_f: function for reducing one value per worker, across695      workers.696  Returns:697    list of T @{tf.Tensor} of reduced values.698  Raises:699    ValueError: inputs not well-formed.700  """701  input_tensors, shape = _flatten_tensors(input_tensors)702  # First stage, reduce across each worker using gather_devices.703  devices = [t.device for t in input_tensors]704  per_worker_devices, per_worker_values = _split_by_task(devices, input_tensors)705  num_workers = len(per_worker_devices)706  up_values = []707  if len(gather_devices) != num_workers:708    raise ValueError("For shuffle hybrid, gather_devices must contain one "709                     "device per worker. ")710  for w in range(0, num_workers):711    reduced_shards = _build_shuffle_gather(712        per_worker_values[w], [gather_devices[w]], red_op)713    up_values.append(reduced_shards[0])714  # Second stage, apply upper_level_f.715  level_2_output = upper_level_f(up_values)716  # Third stage, apply shuffle scatter at each worker.717  output_tensors = []718  for w in range(0, num_workers):719    output_tensors += _build_shuffle_scatter(720        [level_2_output[w]], per_worker_devices[w])721  if len(shape) != 1:722    output_tensors = _reshape_tensors(output_tensors, shape)723  return output_tensors724def build_shuffle_then_ring(input_tensors, gather_devices, subdiv,725                            red_n_op, red_op, un_op=None):726  """Construct hybrid of Shuffle within workers, Ring across workers."""727  def upper_builder(tensors):728    return build_ring_all_reduce(tensors, len(tensors), subdiv, [0],729                                 red_op, un_op)730  def upper_level_f(tensors):731    return _reduce_non_singleton(tensors, upper_builder, un_op)732  return _build_shuffle_hybrid(733      input_tensors, gather_devices, red_n_op, upper_level_f)734def build_shuffle_then_shuffle(input_tensors, first_gather_devices,735                               second_gather_devices, red_op, un_op=None):736  """Construct hybrid of Shuffle within workers, Shuffle across workers."""737  def upper_builder(tensors):738    return build_shuffle_all_reduce(tensors, second_gather_devices,739                                    red_op, un_op)740  def upper_level_f(tensors):741    return _reduce_non_singleton(tensors, upper_builder, un_op)742  return _build_shuffle_hybrid(...app.py
Source:app.py  
1import paho.mqtt.client as mqtt2from flask import Flask,g,render_template,request,Response3from flask_cors import CORS4from flask_socketio import SocketIO, emit5from flask_apscheduler import APScheduler6from libDB import logDB,clearOldDB,queryDB7import numpy as np8import json9from libPickle import *10from pyimagesearch.motion_detection import SingleMotionDetector11from imutils.video import VideoStream12import threading13import argparse14import datetime15import imutils16import schedule17import time18import cv219#edit by chrome20"""21Date storage22- hot    : pickle devices data23- cold   :24- frozen : sqlite325""" 26class Config(object):27    SCHEDULER_API_ENABLED = True28    SECRET_KEY = 'secret!'29    threaded=True30    31scheduler = APScheduler()32# initialize the output frame and a lock used to ensure thread-safe33# exchanges of the output frames (useful for multiple browsers/tabs34# are viewing tthe stream)35outputFrame = None36lock = threading.Lock()37app = Flask(__name__)38app.config.from_object(Config())39CORS(app)40socketio = SocketIO(app)41# initialize the video stream and allow the camera sensor to42# warmup43#vs = VideoStream(usePiCamera=1).start()44vs = VideoStream(src=0).start()45# Create a dictionary called devices to store the device number, name, and device state:46devices = {47    'one' : {'floor' : 'fl1','position' : 'garage','object' : 'door', 'cmd' : '01', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl1/garage/door/01', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},48    'two' : {'floor' : 'fl1','position' : 'garage','object' : 'door', 'cmd' : '02', 'state' : 'False', 'type' : 'latch', 'topic' : 'fl1/garage/door/02', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},49    'three' : {'floor' : 'fl1','position' : 'com','object' : 'light', 'cmd' : '01', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl1/com/light/01', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},50    'four' : {'floor' : 'fl1','position' : 'com','object' : 'light', 'cmd' : '02', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl1/com/light/02', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},51    'five' : {'floor' : 'fl2','position' : 'liv','object' : 'fan', 'cmd' : '01', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl2/liv/fan/01', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},52    'six' : {'floor' : 'fl2','position' : 'cen','object' : 'light', 'cmd' : '01', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl2/cen/light/01', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},53    'seven' : {'floor' : 'fl3','position' : 'bed','object' : 'fan', 'cmd' : '01', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl3/bed/fan/01', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'}54    }55try:56    devices = load_pickle_obj('devices')57    # print(devices)58    print("devices exist")59except:60    save_pickle_obj(devices,'devices')61sensors = {62    'one' : {'name' : 'sensorName1'}63    }64jobCron = { 'startHour':'14',65            'startMinute':'00',66            'stopHour':'14',67            'stopMinute':'30'68    }69try:70    jobCron = load_pickle_obj('jobCron')71    # print(jobCron)72    print("jobCron exist")73except:74    save_pickle_obj(jobCron,'jobCron')75queryDatas = {'temp':{},76            'timeStamp':{},77            'humid':{}}78# Put the device dictionary into the template data dictionary:79templateData = {80    'devices' : devices,81    'sensors' : sensors,82    'jobCron': jobCron,83    'queryDatas':queryDatas84    }85@scheduler.task('cron', id='do_job_on', hour=jobCron['startHour'], minute=jobCron['startMinute'])86def jobOn():87    print('Job on executed')88    for device in devices:89        if devices[device]['type'] == 'on/off':90            if devices[device]['time_enable'] == 'True':91                devices[device]['state'] = 'True'92                mqtt_client.publish(devices[device]['topic'],"on")93            print(device+' '+devices[device]['time_enable'])94    try:95        save_pickle_obj(devices,'devices')96    except:97        print("error")98@scheduler.task('cron', id='do_job_off', hour=jobCron['stopHour'], minute=jobCron['stopMinute'])99def jobOff():100    print('Job off executed')101    for device in devices:102        if devices[device]['type'] == 'on/off':103            if devices[device]['time_enable'] == 'True':104                devices[device]['state'] = 'False'105                mqtt_client.publish(devices[device]['topic'],"off")106            print(device+' '+devices[device]['time_enable'])107    try:108        save_pickle_obj(devices,'devices')109    except:110        print("error")111@scheduler.task('cron', id='do_job_DB', minute='*')112def jobDB():113    print('Job DB executed')114    for device in devices:115        if devices[device]['th_enable'] == 'True':116            logDB(devices[device]['topic'],devices[device]['temp'],devices[device]['humid'],"DHT","true")117            try:118                save_pickle_obj(devices,'devices')119            except:120                print("error")121        122@scheduler.task('cron', id='do_job_ClrDB', day='*')123def jobClrDB():124    print('Job ClrDB executed')125    clearOldDB()126scheduler.init_app(app)127scheduler.start()128# The callback for when the client receives a CONNACK response from the server.129def on_connect(client, userdata, flags, rc):130    print("Connected with result code "+str(rc))131    # Subscribing in on_connect() means that if we lose the connection and132    # reconnect then subscriptions will be renewed.133    client.subscribe("fl1/#")#  + single-level wild card, # multi-level wild card, 134    client.subscribe("fl2/#")135    client.subscribe("fl3/#")136    client.subscribe("#")137    138def whx(ownval,owndicts):139    for x in owndicts:140        if (ownval) in owndicts[x].values():141            return x142    return 'none'143# The callback for when a PUBLISH message is received from the ESP8266.144def on_message(client, userdata, message):145    # socketio.emit('my variable')146    print("Received message '" + str(message.payload.decode('utf8')) + "' on topic '"+ message.topic + "' with QoS " + str(message.qos))147    if message.topic.startswith('fl'):148        topicMsgSplit = message.topic.split("/")149        slash_count = message.topic.count("/")150        if slash_count>=3:151            topicMsg = topicMsgSplit[0]+"/"+topicMsgSplit[1]+"/"+topicMsgSplit[2]+"/"+topicMsgSplit[3]152            indexName = whx(topicMsg,devices)153            devices_list = list(devices)154            n_index = devices_list.index(indexName)155            if "/temperature" in message.topic:156                devices[indexName]['th_enable'] = 'True'157                devices[indexName]['temp'] = str(message.payload.decode('utf8'))158                socketio.emit('temp' , {'data': devices[indexName]['temp'],'pos':n_index})159                #print("temperature update")160            if "/humidity" in message.topic:161                devices[indexName]['th_enable'] = 'True'162                devices[indexName]['humid'] = str(message.payload.decode('utf8'))163                socketio.emit('humid' , {'data': devices[indexName]['humid'],'pos':n_index})164                # print("humidity update")            165            if "/feedback" in message.topic:166                if "on" in str(message.payload.decode('utf8')):167                    if devices[indexName]['state'] == 'False':168                        devices[indexName]['state'] = 'True'169                        socketio.emit('refresh' , {})170                        # print("refresh")171                    172                else:173                    if devices[indexName]['state'] == 'True':174                        devices[indexName]['state'] = 'False'175                        socketio.emit('refresh' , {})176                        # print("refresh")177                # print("feedback update")178        else:179            print("mqtt message.topic error")180    elif message.topic.startswith('tele'):181        #print("tasmota")182        topicMsgSplit = message.topic.split("/")#[0]:tele,[1]:name,[2]:classify183        #topicMsg = topicMsgSplit[0]+"/"+topicMsgSplit[1]+"/"+topicMsgSplit[2]184        if topicMsgSplit[2] == "SENSOR":185            #print(topicMsgSplit[1])186            topic_json=json.dumps(str(message.payload.decode('utf8')))187            print(topic_json["ENERGY"])188            189    else:190        print("unknown topic: "+message.topic)191        192    try:193        save_pickle_obj(devices,'devices')194    except:195        print("error")196    197mqtt_client = mqtt.Client()198mqtt_client.on_connect = on_connect199mqtt_client.on_message = on_message200mqtt_client.connect("127.0.0.1",1883,60)201# mqtt_client.connect("192.168.2.46",1883,60)202mqtt_client.loop_start()203@app.route("/",methods=['GET', 'POST'])204def main():205    if request.method == 'GET':206        queryDatas['temp'] = queryDB("temp")207        queryDatas['humid'] = queryDB("humid")208        queryDatas['timeStamp'] = queryDB("timeStamp")209        # print(queryDatas['timeStamp'])210        templateData = {211            'devices' : devices,212            'sensors' : sensors,213            'jobCron': jobCron,214            'queryDatas':queryDatas215            }216        try:217            save_pickle_obj(devices,'devices')218        except:219            print("error")220        # Pass the template data into the template main.html and return it to the user221        return render_template('main.html', async_mode=socketio.async_mode, **templateData)222    elif request.method == 'POST':223        return 'post method do nothing'224    else:225        return 'method error'226@app.route("/debug")227def debug():228    templateData = {229        'devices' : devices,230        'sensors' : sensors,231        'jobCron': jobCron,232        'queryDatas':queryDatas233        }234    # Pass the template data into the template main.html and return it to the user235    return render_template('debug.html', async_mode=socketio.async_mode, **templateData)236# The function below is executed when someone requests a URL with the device number and action in it:237@app.route("/<device>/<floor>/<position>/<object>/<cmd>/<ctrl>",methods = ['POST', 'GET'])238def action(device, floor, position, object, cmd, ctrl):239    # Convert the device from the URL into an integer:240    #function = function#int(function)241    # Get the device name for the device being changed:242    # deviceName = devices[function]['name']243    # If the action part of the URL is "1" execute the code indented below:244    if ctrl == "on":245        mqtt_client.publish(devices[device]['topic'],"on")246        #print('mp '+devices[device]['topic'])247        devices[device]['state'] = 'True'248    # if ctrl == "0" and object == 'door':249    if ctrl == "off":250        mqtt_client.publish(devices[device]['topic'],"off")251        #print('mp '+devices[device]['topic'])252        devices[device]['state'] = 'False'253    if ctrl == "toggle":254        if devices[device]['state'] == 'True':255            mqtt_client.publish(devices[device]['topic'],"off")256            #print('mp '+devices[device]['topic'])257            devices[device]['state'] = 'False'258        else:   259            mqtt_client.publish(devices[device]['topic'],"on")260            #print('mp '+devices[device]['topic'])261            devices[device]['state'] = 'True'262    if ctrl == "click":263        mqtt_client.publish(devices[device]['topic'],"click")264        print('click '+devices[device]['topic'])265        devices[device]['state'] = 'False'266        267    # Along with the device dictionary, put the message into the template data dictionary:268    templateData = {269        'devices' : devices,270        'sensors' : sensors,271        'jobCron': jobCron,272        'queryDatas':queryDatas273    }274    try:275        save_pickle_obj(devices,'devices')276    except:277        print("error")278    return render_template('main.html', **templateData)279@app.route("/addSched/<device>")280def addSched(device):281    print('time_enable '+devices[device]['time_enable'])282    devices[device]['time_enable'] = 'True'283    templateData = {284        'devices' : devices,285        'sensors' : sensors,286        'jobCron': jobCron,287        'queryDatas':queryDatas288    }289    try:290        save_pickle_obj(devices,'devices')291    except:292        print("error")293    return render_template('main.html', **templateData)294    295    296@app.route("/rmSched/<device>")297def rmSched(device):298    print('time_enable '+devices[device]['time_enable'])299    devices[device]['time_enable'] = 'False'300    templateData = {301        'devices' : devices,302        'sensors' : sensors,303        'jobCron': jobCron,304        'queryDatas':queryDatas305    }306    try:307        save_pickle_obj(devices,'devices')308    except:309        print("error")310    return render_template('main.html', **templateData)311@app.route('/startTime',methods = ['POST', 'GET'])312def startTime():313    if request.method == 'POST':314        print(request.form)315        result1 = str(request.form['startTime1'])316        result2 = str(request.form['startTime2'])317        for job in scheduler.get_jobs():318            print(job.id)319        320        try:321            scheduler.scheduler.reschedule_job('do_job_on', trigger='cron', hour=result1, minute=result2)322            jobCron['startHour']=result1323            jobCron['startMinute']=result2324        except:325            pass326    templateData = {327        'devices' : devices,328        'sensors' : sensors,329        'jobCron': jobCron,330        'queryDatas':queryDatas331    }332    try:333        save_pickle_obj(devices,'devices')334        save_pickle_obj(jobCron,'jobCron')335    except:336        print("error")337    return render_template('main.html', **templateData)338        339@app.route('/stopTime',methods = ['POST', 'GET'])340def stopTime():341    if request.method == 'POST':342        result1 = str(request.form['stopTime1'])343        result2 = str(request.form['stopTime2'])344        for job in scheduler.get_jobs():345            print(job.id)346                347        try:348            scheduler.scheduler.reschedule_job('do_job_off', trigger='cron', hour=result1, minute=result2)349            jobCron['stopHour']=result1350            jobCron['stopMinute']=result2351        except:352            pass353        354    templateData = {355        'devices' : devices,356        'sensors' : sensors,357        'jobCron': jobCron,358        'queryDatas':queryDatas359    }360    try:361        save_pickle_obj(devices,'devices')362        save_pickle_obj(jobCron,'jobCron')363    except:364        print("error")365    return render_template('main.html', **templateData)366@app.route("/videoStream")367def videoStream():368    # return the rendered template369    return render_template("videoStream.html")370def detect_motion(frameCount):371    # grab global references to the video stream, output frame, and372    # lock variables373    global vs, outputFrame, lock374    # initialize the motion detector and the total number of frames375    # read thus far376    md = SingleMotionDetector(accumWeight=0.1)377    total = 0378    # loop over frames from the video stream379    while True:380        # read the next frame from the video stream, resize it,381        # convert the frame to grayscale, and blur it382        frame = vs.read()383        frame = imutils.resize(frame, width=400)384        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)385        gray = cv2.GaussianBlur(gray, (7, 7), 0)386        # grab the current timestamp and draw it on the frame387        timestamp = datetime.datetime.now()388        cv2.putText(frame, timestamp.strftime("%A %d %B %Y %I:%M:%S%p"), 389            (10, frame.shape[0] - 10),cv2.FONT_HERSHEY_SIMPLEX, 0.35, (0, 0, 255), 1)390        # if the total number of frames has reached a sufficient391        # number to construct a reasonable background model, then392        # continue to process the frame393        if total > frameCount:394            # detect motion in the image395            motion = md.detect(gray)396            # cehck to see if motion was found in the frame397            if motion is not None:398                # unpack the tuple and draw the box surrounding the399                # "motion area" on the output frame400                (thresh, (minX, minY, maxX, maxY)) = motion401                cv2.rectangle(frame, (minX, minY), (maxX, maxY),(0, 0, 255), 2)402        403        # update the background model and increment the total number404        # of frames read thus far405        md.update(gray)406        total += 1407        # acquire the lock, set the output frame, and release the408        # lock409        with lock:410            outputFrame = frame.copy()411            # outputFrame = gray.copy()412            413        414def generate():415    # grab global references to the output frame and lock variables416    global outputFrame, lock417    # loop over frames from the output stream418    while True:419        # wait until the lock is acquired420        with lock:421            # check if the output frame is available, otherwise skip422            # the iteration of the loop423            if outputFrame is None:424                continue425            # encode the frame in JPEG format426            (flag, encodedImage) = cv2.imencode(".jpg", outputFrame)427            # ensure the frame was successfully encoded428            if not flag:429                continue430        # yield the output frame in the byte format431        yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + bytearray(encodedImage) + b'\r\n')432@app.route("/video_feed")433def video_feed():434    # return the response generated along with the specific media435    # type (mime type)436    return Response(generate(),mimetype = "multipart/x-mixed-replace; boundary=frame")437        438@socketio.on('my event')439def handle_my_custom_event(json):440    #print('received json data here: ' + str(json))441    pass442# function for responses443def results():444    # build a request object445    req = request.get_json(silent=True, force=True)446    print("Request:")447    print(json.dumps(req, indent=4))448    # print(req,flush=True)449    # fetch fulfillmentText from json450    fulfillmentText = req.get('queryResult').get('fulfillmentText')451    # print(req,flush=True)452    # print(fulfillmentText, flush=True)453    mqtt_client.publish("fl1/garage/door/02","slide")454    # return a fulfillment response455    #return {'fulfillmentText': 'This is a response from webhook.'}456    return 'results'457@app.route('/webhook',methods=['GET','POST'])458def webhook():459    if request.method == 'POST':460        return results()461    else:462        return 'Hello'463if __name__ == "__main__":464    t = threading.Thread(target=detect_motion, args=(64,))465    t.daemon = True466    t.start()467    socketio.run(app, host='0.0.0.0', port=80, debug=True, use_reloader=False)468    469# release the video stream pointer...path.py
Source:path.py  
...158        List of devices ordered by coordinates159        """160        return sorted(self.devices, key=lambda dev: dev.md.z)161    @property162    def blocking_devices(self):163        """164        A list of devices that are currently inserted or are in unknown165        positions. This includes devices downstream of the first166        :attr:`.impediment`167        """168        # Cache important prior devices169        prior = None170        last_branches = list()171        block = list()172        for device in self.path:173            # If we have switched beamlines174            if prior and device.md.beamline != prior.md.beamline:175                # Find improperly configured optics176                for optic in last_branches:177                    # If this optic is responsible for delivering beam178                    # to this hutch and it is not configured to do so.179                    # Mark it as blocking180                    if device.md.beamline in optic.branches:181                        if device.md.beamline not in optic.destination:182                            block.append(optic)183                    # Otherwise ensure it is removed from the beamline184                    elif optic.md.beamline not in optic.destination:185                        block.append(optic)186                # Clear optics that have been evaluated187                last_branches.clear()188            # If our last device was an optic, make sure it wasn't required189            # to continue along this beampath190            elif (prior in last_branches191                    and device.md.beamline in prior.branches192                    and device.md.beamline not in prior.destination):193                block.append(last_branches.pop(-1))194            # Find branching devices and store195            # They will be marked as blocking by downstream devices196            dev_state = find_device_state(device)197            if device in self.branches:198                last_branches.append(device)199            # Find inserted devices200            elif dev_state == DeviceState.Inserted:201                # Ignore devices with low enough transmssion202                trans = getattr(device, 'transmission', 1)203                if trans < self.minimum_transmission:204                    block.append(device)205            # Find unknown and faulted devices206            elif dev_state != DeviceState.Removed:207                block.append(device)208            # Stache our prior device209            prior = device210        return block211    @property212    def incident_devices(self):213        """214        A list of devices the beam is currently incident on. This includes the215        current :attr:`.impediment` and any upstream devices that may be216        inserted but have more transmission than :attr:`.minimum_transmission`217        """218        # Find device information219        inserted = [d for d in self.path220                    if find_device_state(d) == DeviceState.Inserted]221        impediment = self.impediment222        # No blocking devices, all inserted devices incident223        if not impediment:224            return inserted225        # Otherwise only return upstream of the impediment226        return [d for d in inserted if d.md.z <= impediment.md.z]227    def show_devices(self, file=None):228        """229        Print a table of the devices along the beamline230        Parameters231        ----------232        file : file-like object233            File to writable234        """235        # Initialize Table236        pt = PrettyTable(['Name', 'Prefix', 'Position', 'Beamline', 'State'])237        # Adjust Table settings238        pt.align = 'r'239        pt.align['Name'] = 'l'240        pt.align['Prefix'] = 'l'241        pt.float_format = '8.5'...vscsi_util.py
Source:vscsi_util.py  
...136                        devname = sg137                    scsi_id = _vscsi_get_scsiid(sg)138            devices.append([hctl, devname, sg, scsi_id])139    return devices140def vscsi_get_scsidevices(mask="*"):141    """ get all scsi devices information """142    devices = _vscsi_get_scsidevices_by_lsscsi("[%s]" % mask)143    if devices or (len(mask) and mask[0] != "*"):144        # devices found or partial device scan145        return devices146    return _vscsi_get_scsidevices_by_sysfs()147def vscsi_get_hctl_and_devname_by(target, scsi_devices = None):148    if target.startswith('/dev/'):149        target = os.path.realpath(target)150    if scsi_devices is None:151        if len(target.split(':')) == 4:152            scsi_devices = _vscsi_get_scsidevices_by_lsscsi(target)153        elif target.startswith('/dev/'): 154            scsi_devices = _vscsi_get_scsidevices_by_lsscsi("| grep %s" % target)155        else:156            scsi_devices = _vscsi_get_scsidevices_by_lsscsi("")157        if not scsi_devices:158            scsi_devices = _vscsi_get_scsidevices_by_sysfs()159    if len(target.split(':')) == 4:160        return _vscsi_get_devname_by(target, scsi_devices)161    else:162        return _vscsi_get_hctl_by(target, scsi_devices)163def get_scsi_vendor(pHCTL):164    try:165        sysfs_mnt = utils.find_sysfs_mount() 166        sysfs_scsi_dev_path = \167            os.path.join(sysfs_mnt + SYSFS_SCSI_PATH, pHCTL)168        scsi_vendor = \169            os.popen('cat ' + sysfs_scsi_dev_path + \170                              SYSFS_SCSI_DEV_VENDOR_PATH).read()171        return scsi_vendor.splitlines()[0]172    except:173        return None174def get_scsi_model(pHCTL):175    try:176        sysfs_mnt = utils.find_sysfs_mount() 177        sysfs_scsi_dev_path = \178            os.path.join(sysfs_mnt + SYSFS_SCSI_PATH, pHCTL)179        scsi_model = \180            os.popen('cat ' + sysfs_scsi_dev_path + \181                              SYSFS_SCSI_DEV_MODEL_PATH).read()182        return scsi_model.splitlines()[0]183    except:184        return None185def get_scsi_typeid(pHCTL):186    try:187        sysfs_mnt = utils.find_sysfs_mount() 188        sysfs_scsi_dev_path = \189            os.path.join(sysfs_mnt + SYSFS_SCSI_PATH, pHCTL)190        scsi_typeid = \191            os.popen('cat ' + sysfs_scsi_dev_path + \192                              SYSFS_SCSI_DEV_TYPEID_PATH).read()193        return int(scsi_typeid.splitlines()[0])194    except:195        return None196def get_scsi_revision(pHCTL):197    try:198        sysfs_mnt = utils.find_sysfs_mount() 199        sysfs_scsi_dev_path = \200            os.path.join(sysfs_mnt + SYSFS_SCSI_PATH, pHCTL)201        scsi_revision = \202            os.popen('cat ' + sysfs_scsi_dev_path + \203                              SYSFS_SCSI_DEV_REVISION_PATH).read()204        return scsi_revision.splitlines()[0]205    except:206        return None207def get_scsi_scsilevel(pHCTL):208    try:209        sysfs_mnt = utils.find_sysfs_mount() 210        sysfs_scsi_dev_path = \211            os.path.join(sysfs_mnt + SYSFS_SCSI_PATH, pHCTL)212        scsi_scsilevel = \213            os.popen('cat ' + sysfs_scsi_dev_path + \214                              SYSFS_SCSI_DEV_SCSILEVEL_PATH).read()215        return int(scsi_scsilevel.splitlines()[0])216    except:217        return None218def _make_scsi_record(scsi_info):219    scsi_rec = {220        'physical_HCTL': scsi_info[0],221        'dev_name': None,222        'sg_name': scsi_info[2],223        'scsi_id': None224    }225    if scsi_info[1] is not None:226        scsi_rec['dev_name'] = scsi_info[1] 227    if scsi_info[3] is not None:228        scsi_rec['scsi_id'] = scsi_info[3] 229    scsi_rec['vendor_name'] = \230        get_scsi_vendor(scsi_rec['physical_HCTL'])231    scsi_rec['model'] = \232        get_scsi_model(scsi_rec['physical_HCTL'])233    scsi_rec['type_id'] = \234        get_scsi_typeid(scsi_rec['physical_HCTL'])235    scsi_rec['revision'] = \236        get_scsi_revision(scsi_rec['physical_HCTL'])237    scsi_rec['scsi_level'] = \238        get_scsi_scsilevel(scsi_rec['physical_HCTL'])239    try:240        lsscsi_info = os.popen('lsscsi %s 2>/dev/null' % scsi_rec['physical_HCTL']).read().split()241        scsi_rec['type'] = lsscsi_info[1]242    except:243        scsi_rec['type'] = None244    return scsi_rec245def get_scsi_device(pHCTL):246    scsis_info = _vscsi_get_scsidevices_by_lsscsi(pHCTL)247    if not scsis_info:248        scsis_info = _vscsi_get_scsidevices_by_sysfs()249    for scsi_info in scsis_info:250        if scsi_info[0] == pHCTL:251            return _make_scsi_record(scsi_info)252    return None253def get_all_scsi_devices(mask="*"):254    scsi_records = []255    for scsi_info in vscsi_get_scsidevices(mask):256        scsi_record = _make_scsi_record(scsi_info)257        scsi_records.append(scsi_record)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
