Best Python code snippet using playwright-python
all_reduce.py
Source:all_reduce.py  
1# Copyright 2017 The TensorFlow Authors. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7#     http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14# ==============================================================================15"""Utilities to construct a TF subgraph implementing distributed All-Reduce."""16from __future__ import absolute_import17from __future__ import division18from __future__ import print_function19import collections20import math21from tensorflow.contrib import nccl22from tensorflow.python.framework import device as device_lib23from tensorflow.python.framework import ops24from tensorflow.python.ops import array_ops25from tensorflow.python.ops import math_ops26def _flatten_tensors(tensors):27  """Check tensors for isomorphism and flatten.28  Args:29    tensors: list of T @{tf.Tensor} which must all have the same shape.30  Returns:31    tensors: a list of T @{tf.Tensor} which are flattened (1D) views of tensors32    shape: the original shape of each element of input tensors33  Raises:34    ValueError: tensors are empty or non-isomorphic or have unknown shape.35  """36  if not tensors:37    raise ValueError("tensors cannot be empty")38  shape = tensors[0].shape39  for tensor in tensors:40    shape = shape.merge_with(tensor.shape)41  if not shape.is_fully_defined():42    raise ValueError("Tensors must have statically known shape.")43  if len(shape) != 1:44    reshaped = []45    for t in tensors:46      with ops.colocate_with(t):47        reshaped.append(array_ops.reshape(t, [-1]))48    tensors = reshaped49  return tensors, shape50def _reshape_tensors(tensors, shape):51  """Reshape tensors flattened by _flatten_tensors.52  Args:53    tensors: list of T @{tf.Tensor} of identical length 1D tensors.54    shape: list of integers describing the desired shape.  Product of55      the elements must equal the length of each tensor.56  Returns:57    list of T @{tf.Tensor} which are the reshaped inputs.58  """59  reshaped = []60  for t in tensors:61    with ops.colocate_with(t):62      reshaped.append(array_ops.reshape(t, shape))63  return reshaped64def _padded_split(tensor, pieces):65  """Like split for 1D tensors but pads-out case where len % pieces != 0.66  Args:67    tensor: T @{tf.Tensor} that must be 1D.68    pieces: a positive integer specifying the number of pieces into which69      tensor should be split.70  Returns:71    list of T @{tf.Tensor} of length pieces, which hold the values of72      thin input tensor, in order.  The final tensor may73      be zero-padded on the end to make its size equal to those of all74      of the other tensors.75  Raises:76    ValueError: The input tensor is not 1D.77  """78  shape = tensor.shape79  if 1 != len(shape):80    raise ValueError("input tensor must be 1D")81  tensor_len = shape[0].value82  with ops.colocate_with(tensor):83    if tensor_len % pieces != 0:84      # pad to an even length85      chunk_size = 1 + tensor_len // pieces86      if pieces > tensor_len:87        # This is an edge case that should not come up in practice,88        # i.e. a different reduction algorithm would be better,89        # but we'll make it work just for completeness.90        pad_len = pieces - tensor_len91        extended_whole = array_ops.concat(92            [tensor, array_ops.zeros([pad_len], dtype=tensor.dtype)], 0)93        parts = array_ops.split(extended_whole, pieces)94        return parts, pad_len95      elif (pieces - 1) * chunk_size >= tensor_len:96        # Another edge case of limited real interest.97        pad_len = (pieces * chunk_size) % tensor_len98        extended_whole = array_ops.concat(99            [tensor, array_ops.zeros([pad_len], dtype=tensor.dtype)], 0)100        parts = array_ops.split(extended_whole, pieces)101        return parts, pad_len102      else:103        last_chunk_size = tensor_len - (pieces - 1) * chunk_size104        pad_len = chunk_size - last_chunk_size105        piece_lens = [chunk_size for _ in range(pieces - 1)] + [last_chunk_size]106        parts = array_ops.split(tensor, piece_lens)107        parts[-1] = array_ops.concat(108            [parts[-1], array_ops.zeros([pad_len], dtype=tensor.dtype)], 0)109        return parts, pad_len110    else:111      return array_ops.split(tensor, pieces), 0112def _strip_padding(tensors, pad_len):113  """Strip the suffix padding added by _padded_split.114  Args:115    tensors: list of T @{tf.Tensor} of identical length 1D tensors.116    pad_len: number of elements to be stripped from the end of each tensor.117  Returns:118    list of T @{tf.Tensor} which are the stripped inputs.119  Raises:120    ValueError: tensors must be a non-empty list of 1D tensors, and121      each must be longer than pad_len.122  """123  if not tensors:124    raise ValueError("tensors cannot be empty")125  shape = tensors[0].shape126  if len(shape) > 1:127    raise ValueError("tensors must be 1D")128  prefix_len = int(shape[0] - pad_len)129  if prefix_len < 0:130    raise ValueError("pad_len longer than tensor")131  stripped = []132  for t in tensors:133    with ops.colocate_with(t):134      stripped.append(array_ops.slice(t, [0], [prefix_len]))135  return stripped136def _ragged_split(tensor, pieces):137  """Like split for 1D tensors but allows case where len % pieces != 0.138  Args:139    tensor: T @{tf.Tensor} that must be 1D.140    pieces: a positive integer specifying the number of pieces into which141      tensor should be split.142  Returns:143    list of T @{tf.Tensor} of length pieces, which hold the values of144      the input tensor, in order.  The final tensor may be shorter145      than the others, which will all be of equal length.146  Raises:147    ValueError: input tensor must be 1D.148  """149  shape = tensor.shape150  if 1 != len(shape):151    raise ValueError("input tensor must be 1D")152  tensor_len = shape[0].value153  chunk_size = tensor_len // pieces154  with ops.colocate_with(tensor):155    if tensor_len != (pieces * chunk_size):156      # last piece will be short157      assert pieces > 1158      last_chunk_size = tensor_len - ((pieces - 1) * chunk_size)159      assert last_chunk_size > 0160      piece_lens = [chunk_size for _ in range(pieces - 1)] + [last_chunk_size]161      return array_ops.split(tensor, piece_lens)162    else:163      return array_ops.split(tensor, pieces)164def _ring_permutations(num_workers, num_subchunks, gpu_perm):165  """"Generate an array of device index arrays, one for each subchunk.166  In the basic ring reduction algorithm there are size(T)/num_devices167  data chunks and each device process one chunk per tick, i.e. sending168  one chunk and receiving one chunk.  The idea of subchunking is that169  each device processes num_subchunks smaller data regions per tick,170  and the ring rank permutation is different for each subchunk index171  so that a device is potentially sending to and receiving from172  num_subchunks different other devices at each tick.  Where multiple173  independent data channels exist between devices, this strategy174  supplies a method of using them in parallel.175  Args:176    num_workers: number of worker tasks177    num_subchunks: number of subchunks into which to divide each per-GPU chunk.178    gpu_perm: an array of integers in [0, num_gpus-1] giving the default179      ring order of GPUs at each worker.  Other permutations will be generated180      by rotating this array and splicing together per-worker instances.181  Raises:182    ValueError: the number of subchunks may not exceed the number of GPUs.183  Returns:184    pred_by_s_d: list of lists that maps (by index) from (subchunk, dev) to185        preceding device in the permutation for that subchunk.  The186        device index of GPU i at worker j is i + (j * num_gpus).187    rank_by_s_d: list of lists that maps (by index) from (subchunk, dev) to188       local rank of device d in the permutation for that subchunk.189  """190  num_gpus = len(gpu_perm)191  devices = num_workers * num_gpus192  if devices == 0:193    return [], []194  if num_subchunks > num_gpus:195    raise ValueError(196        "num_subchunks %d must be <= num_gpus %d" % (num_subchunks, num_gpus))197  rotation_interval = max(1, int(num_gpus / num_subchunks))198  perms_by_s = []199  for s in range(0, num_subchunks):200    full_order = []201    offset = s * rotation_interval202    for w in range(0, num_workers):203      default_order = [(w * num_gpus) + i for i in gpu_perm]204      dev_order = default_order[offset:] + default_order[:offset]205      full_order += dev_order206    perms_by_s.append(full_order)207  pred_by_s_d = [[-1 for d in range(0, devices)]208                 for s in range(0, num_subchunks)]209  rank_by_s_d = [[-1 for d in range(0, devices)]210                 for s in range(0, num_subchunks)]211  for s in range(0, num_subchunks):212    for d in range(0, devices):213      for t in range(0, devices):214        if d == perms_by_s[s][t]:215          rank_by_s_d[s][d] = t216          pred_by_s_d[s][d] = perms_by_s[s][(t + devices - 1) % devices]217          break218  return (pred_by_s_d, rank_by_s_d)219def build_ring_all_reduce(input_tensors, num_workers, num_subchunks,220                          gpu_perm, red_op, un_op=None):221  """Construct a subgraph performing a ring-style all-reduce of input_tensors.222  Args:223    input_tensors: a list of T @{tf.Tensor} objects, which must all224      have the same shape and type.225    num_workers: number of worker tasks spanned by input_tensors.226    num_subchunks: number of subchunks each device should process in one tick.227    gpu_perm: a list of ints giving a ring-wise rank ordering of GPUs at228      each worker.  All workers must have the same number of229      GPUs with the same rank ordering.  If NVLINK is available, this should230      be a ring order supported by NVLINK edges.231    red_op: a binary operator for elementwise reduction.232    un_op: an optional unary operator to apply to fully reduced values.233  Raises:234    ValueError: empty input_tensors or they don't all have same235    size.236  Returns:237    a list of T @{tf.Tensor} identical sum-reductions of input_tensors.238  """239  if len(input_tensors) < 2:240    raise ValueError("input_tensors must be length 2 or longer")241  input_tensors, shape = _flatten_tensors(input_tensors)242  devices = [t.device for t in input_tensors]243  (pred_by_s_d, rank_by_s_d) = _ring_permutations(244      num_workers, num_subchunks, gpu_perm)245  chunks_by_dev, pad_len = _build_ring_gather(246      input_tensors, devices,247      num_subchunks, pred_by_s_d, rank_by_s_d, red_op)248  if un_op:249    chunks_by_dev = _apply_unary_to_chunks(un_op, chunks_by_dev)250  output_tensors = _build_ring_scatter(pred_by_s_d, rank_by_s_d,251                                       chunks_by_dev)252  if pad_len > 0:253    output_tensors = _strip_padding(output_tensors, pad_len)254  if len(shape) != 1:255    output_tensors = _reshape_tensors(output_tensors, shape)256  return output_tensors257def _build_ring_gather(input_tensors, devices, num_subchunks,258                       pred_by_s_d, rank_by_s_d, red_op):259  """Construct a subgraph for the first (reduction) pass of ring all-reduce.260  Args:261    input_tensors: a list of T @{tf.Tensor} 1D input tensors of same262      shape and type.263    devices: array of device name strings264    num_subchunks: number of subchunks each device should process in one tick.265    pred_by_s_d: as produced by _ring_permutations266    rank_by_s_d: as produced by _ring_permutations267    red_op: a binary operator for elementwise reduction268  Raises:269    ValueError: tensors must all be one dimensional.270  Returns:271    list of list of T @{tf.Tensor} of (partially) reduced values where272    exactly num_subchunks chunks at each device are fully reduced.273  """274  num_devices = len(input_tensors)275  if num_devices == 0:276    return []277  if num_devices == 1:278    return input_tensors279  shape = input_tensors[0].shape280  if 1 != len(shape):281    raise ValueError("input tensors must be 1D")282  num_chunks = num_devices * num_subchunks283  num_ticks = num_devices - 1284  # Initialize chunks_by_dev with splits of the input tensors.285  chunks_by_dev = []286  split_pad_len = 0287  for d in range(0, num_devices):288    with ops.device(devices[d]):289      splits, split_pad_len = _padded_split(input_tensors[d], num_chunks)290      chunks_by_dev.append(splits)291  # Reduction phase292  for tick in range(0, num_ticks):293    # One new partial reduction for every chunk294    new_partial_reductions = [None for _ in range(0, num_chunks)]295    # Compute reductions with respect to last tick's values296    for d in range(0, num_devices):297      with ops.device(devices[d]):298        for s in range(0, num_subchunks):299          rank = rank_by_s_d[s][d]300          seg_index = (rank + num_devices - (2 + tick)) % num_devices301          pred_dev = pred_by_s_d[s][d]302          chunk_index = (seg_index * num_subchunks) + s303          new_partial_reductions[chunk_index] = red_op(304              chunks_by_dev[pred_dev][chunk_index],305              chunks_by_dev[d][chunk_index])306    # Update chunks_by_dev with the new values at the end of the tick.307    for d in range(0, num_devices):308      for s in range(0, num_subchunks):309        rank = rank_by_s_d[s][d]310        seg_index = (rank + num_devices - (2 + tick)) % num_devices311        chunk_index = (seg_index * num_subchunks) + s312        chunks_by_dev[d][chunk_index] = new_partial_reductions[chunk_index]313  return chunks_by_dev, split_pad_len314def _apply_unary_to_chunks(f, chunks_by_dev):315  """Apply a unary op to each tensor in chunks_by_dev, on same device.316  Args:317    f: a unary function over T @{tf.Tensor}.318    chunks_by_dev: list of lists of T @{tf.Tensor}.319  Returns:320    new list of lists of T @{tf.Tensor} with the same structure as321    chunks_by_dev containing the derived tensors.322  """323  output = []324  for x in chunks_by_dev:325    with ops.colocate_with(x[0]):326      output.append([f(t) for t in x])327  return output328def _build_ring_scatter(pred_by_s_d, rank_by_s_d,329                        chunks_by_dev):330  """Construct subgraph for second (scatter) pass of ring all-reduce.331  Args:332    pred_by_s_d: as produced by _ring_permutations333    rank_by_s_d: as produced by _ring_permutations334    chunks_by_dev: list of list of T @{tf.Tensor} indexed by ints335      (device, chunk)336  Raises:337    ValueError: chunks_by_dev is not well-formed338  Returns:339    list of T @{tf.Tensor} which are the fully reduced tensors, one340    at each device corresponding to the outer dimension of chunks_by_dev.341  """342  num_devices = len(chunks_by_dev)343  num_chunks = len(chunks_by_dev[0])344  if 0 != num_chunks % num_devices:345    raise ValueError(346        "Expect number of chunks per device to be divisible by num_devices")347  num_subchunks = int(num_chunks / num_devices)348  num_ticks = num_devices - 1349  for tick in range(0, num_ticks):350    passed_values = [None for _ in range(0, num_chunks)]351    for d in range(0, num_devices):352      with ops.colocate_with(chunks_by_dev[d][0]):353        for s in range(0, num_subchunks):354          rank = rank_by_s_d[s][d]355          seg_index = (rank + num_devices - (1 + tick)) % num_devices356          pred_dev = pred_by_s_d[s][d]357          chunk_index = (seg_index * num_subchunks) + s358          passed_values[chunk_index] = array_ops.identity(359              chunks_by_dev[pred_dev][chunk_index])360    for d in range(0, num_devices):361      for s in range(0, num_subchunks):362        rank = rank_by_s_d[s][d]363        seg_index = (rank + num_devices - (1 + tick)) % num_devices364        chunk_index = (seg_index * num_subchunks) + s365        chunks_by_dev[d][chunk_index] = passed_values[chunk_index]366  # Join chunks at each device.367  output = []368  for x in chunks_by_dev:369    with ops.colocate_with(x[0]):370      output.append(array_ops.concat(x, 0))371  return output372def build_recursive_hd_all_reduce(input_tensors, red_op, un_op=None):373  """Construct a subgraph for recursive halving-doubling all-reduce.374  The recursive halving-doubling algorithm is described in375  http://www.mcs.anl.gov/~thakur/papers/ijhpca-coll.pdf376  The concept is to arrange the participating n devices in377  a linear sequence where devices exchange data pairwise378  with one other device in each round.  During the gather379  phase there are lg(n) rounds where devices exchange380  increasingly smaller sub-tensors with another device381  at increasingly greater distances, until at the top382  each device has 1/n of the fully reduced values.  During the383  scatter phase each device exchanges its fully reduced384  sub-tensor (which doubles in length at each round)385  with one other device at increasingly smaller distances386  until each device has all of the fully reduced values.387  Note: this preliminary version requires that len(input_tensors) be a388    power of 2.  TODO(tucker): relax this restriction.  Also, the389    number of elements in each tensor must be divisible by 2^h where h390    is the number of hops in each phase.  This will also be relaxed in391    the future with edge-case specific logic.392  Args:393    input_tensors: list of T @{tf.Tensor} to be elementwise reduced.394    red_op: a binary elementwise reduction Op.395    un_op: an optional unary elementwise Op to apply to reduced values.396  Returns:397    list of T @{tf.Tensor} which are the fully reduced tensors, one398    at each device of input_tensors.399  Raises:400    ValueError: num_devices not a power of 2, or tensor len not divisible401    by 2 the proper number of times.402  """403  devices = [t.device for t in input_tensors]404  input_tensors, shape = _flatten_tensors(input_tensors)405  reduced_shards = _build_recursive_hd_gather(input_tensors, devices, red_op)406  if un_op:407    reduced_shards = [un_op(t) for t in reduced_shards]408  output_tensors = _build_recursive_hd_scatter(reduced_shards, devices)409  if len(shape) != 1:410    output_tensors = _reshape_tensors(output_tensors, shape)411  return output_tensors412def _build_recursive_hd_gather(input_tensors, devices, red_op):413  """Construct the gather phase of recursive halving-doubling all-reduce.414  Args:415    input_tensors: list of T @{tf.Tensor} to be elementwise reduced.416    devices: a list of strings naming the devices hosting input_tensors,417      which will also be used to host the (partial) reduction values.418    red_op: a binary elementwise reduction Op.419  Returns:420    list of T @{tf.Tensor} which are the fully reduced tensor shards.421  Raises:422    ValueError: num_devices not a power of 2, or tensor len not divisible423    by 2 the proper number of times.424  """425  num_devices = len(devices)426  num_hops = int(math.log(num_devices, 2))427  if num_devices != (2 ** num_hops):428    raise ValueError("num_devices must be a power of 2")429  chunks = input_tensors430  for h in range(0, num_hops):431    span = 2 ** h432    group_size = span * 2433    new_chunks = [[] for _ in devices]434    for d in range(0, num_devices):435      if (d % group_size) >= (group_size / 2):436        # skip right half of a pair437        continue438      left_dev = devices[d]439      right_dev = devices[d + span]440      left_split = array_ops.split(chunks[d], 2)441      right_split = array_ops.split(chunks[d+span], 2)442      with ops.device(left_dev):443        new_chunks[d] = red_op(left_split[0], right_split[0])444      with ops.device(right_dev):445        new_chunks[d + span] = red_op(left_split[1], right_split[1])446    chunks = new_chunks447  return chunks448def _build_recursive_hd_scatter(input_tensors, devices):449  """Construct the scatter phase of recursive halving-doublng all-reduce.450  Args:451    input_tensors: list of T @{tf.Tensor} that are fully-reduced shards.452    devices: a list of strings naming the devices on which the reconstituted453      full tensors should be placed.454  Returns:455    list of T @{tf.Tensor} which are the fully reduced tensors.456  """457  num_devices = len(devices)458  num_hops = int(math.log(num_devices, 2))459  assert num_devices == (2 ** num_hops), "num_devices must be a power of 2"460  chunks = input_tensors461  for h in reversed(range(0, num_hops)):462    span = 2 ** h463    group_size = span * 2464    new_chunks = [[] for _ in devices]465    for d in range(0, num_devices):466      if (d % group_size) >= (group_size / 2):467        # skip right half of a pair468        continue469      left_idx = d470      right_idx = d + span471      left_dev = devices[left_idx]472      right_dev = devices[right_idx]473      with ops.device(left_dev):474        new_chunks[left_idx] = array_ops.concat([chunks[left_idx],475                                                 chunks[right_idx]], 0)476      with ops.device(right_dev):477        new_chunks[right_idx] = array_ops.concat([chunks[left_idx],478                                                  chunks[right_idx]], 0)479    chunks = new_chunks480  return chunks481def build_shuffle_all_reduce(input_tensors, gather_devices, red_op, un_op=None):482  """Construct a subgraph for shuffle all-reduce.483  Shuffle reduce is essentially the algorithm implemented when using484  parameter servers.  Suppose tensor length is n, there are d devices485  and g gather shards.  Each device sends a n/g length sub-tensor to486  each gather shard.  The gather shards perform a reduction across d487  fragments, then broadcast the result back to each device.  The488  devices then join the g fully reduced fragments they receive from489  the shards.  The gather shards could perform d-1 pairwise490  reductions, or one d-way reduction.  The first is better where491  reduction Op time is low compared to transmission time, the second492  better in the other case.493  Args:494    input_tensors: list of T @(tf.Tensor} values to be reduced.495    gather_devices: list of names of devices on which reduction shards496      should be placed.497    red_op: an n-array elementwise reduction Op498    un_op: optional elementwise unary Op to be applied to fully-reduced values.499  Returns:500    list of T @{tf.Tensor} which are the fully reduced tensors.501  """502  input_tensors, shape = _flatten_tensors(input_tensors)503  dst_devices = [t.device for t in input_tensors]504  reduced_shards = _build_shuffle_gather(input_tensors, gather_devices,505                                         red_op, un_op)506  output_tensors = _build_shuffle_scatter(reduced_shards, dst_devices)507  if len(shape) != 1:508    output_tensors = _reshape_tensors(output_tensors, shape)509  return output_tensors510def _build_shuffle_gather(input_tensors, gather_devices, red_op, un_op=None):511  """Construct the gather (concentrate and reduce) phase of shuffle all-reduce.512  Args:513    input_tensors: list of T @(tf.Tensor} values to be reduced.514    gather_devices: list of names of devices on which reduction shards515      should be placed.516    red_op: the binary reduction Op517    un_op: optional elementwise unary Op to be applied to fully-reduced values.518  Returns:519    list of T @{tf.Tensor} which are the fully reduced shards.520  Raises:521    ValueError: inputs not well-formed.522  """523  num_source_devices = len(input_tensors)524  num_gather_devices = len(gather_devices)525  shape = input_tensors[0].shape526  if len(shape) != 1:527    raise ValueError("input_tensors must be 1D")528  shards_by_source = []529  for d in range(0, num_source_devices):530    with ops.colocate_with(input_tensors[d]):531      shards_by_source.append(532          _ragged_split(input_tensors[d], num_gather_devices))533  reduced_shards = []534  for d in range(0, num_gather_devices):535    with ops.device(gather_devices[d]):536      values = [s[d] for s in shards_by_source]537      red_shard = red_op(values)538      if un_op:539        red_shard = un_op(red_shard)540      reduced_shards.append(red_shard)541  return reduced_shards542def _build_shuffle_scatter(reduced_shards, dst_devices):543  """Build the scatter phase of shuffle all-reduce.544  Args:545    reduced_shards:  list of T @(tf.Tensor} fully reduced shards546    dst_devices: list of names of devices at which the fully-reduced value547      should be reconstituted.548  Returns:549    list of T @{tf.Tensor} scattered tensors.550  """551  num_devices = len(dst_devices)552  out_tensors = []553  for d in range(0, num_devices):554    with ops.device(dst_devices[d]):555      out_tensors.append(array_ops.concat(reduced_shards, 0))556  return out_tensors557def _split_by_task(devices, values):558  """Partition devices and values by common task.559  Args:560    devices: list of device name strings561    values: list of T @{tf.tensor} of same length as devices.562  Returns:563    (per_task_devices, per_task_values) where both values are564    lists of lists with isomorphic structure: the outer list is565    indexed by task, and the inner list has length of the number566    of values belonging to that task.  per_task_devices contains567    the specific devices to which the values are local, and568    per_task_values contains the corresponding values.569  Raises:570    ValueError: devices must be same length as values.571  """572  num_devices = len(devices)573  if num_devices != len(values):574    raise ValueError("len(devices) must equal len(values)")575  per_task_devices = collections.OrderedDict()576  per_task_values = collections.OrderedDict()577  for d in range(num_devices):578    d_spec = device_lib.DeviceSpec.from_string(devices[d])579    if not hasattr(d_spec, "task") or d_spec.task is None:580      assert False, "failed to parse device %s" % devices[d]581    index = (d_spec.job or "localhost", d_spec.replica or 0, d_spec.task)582    if index not in per_task_devices:583      per_task_devices[index] = []584      per_task_values[index] = []585    per_task_devices[index].append(devices[d])586    per_task_values[index].append(values[d])587  return (list(per_task_devices.values()), list(per_task_values.values()))588def build_nccl_all_reduce(input_tensors, red_op, un_op=None):589  """Build a subgraph that does one full all-reduce, using NCCL.590  Args:591    input_tensors: list of T @{tf.Tensor} of same-shape and type values to592      be reduced.593    red_op: binary elementwise reduction operator.  Must be one of594      {tf.add}595    un_op: optional unary elementwise Op to apply to fully-reduce values.596  Returns:597    list of T @{tf.Tensor} of reduced values.598  Raises:599    ValueError: red_op not supported.600  """601  if red_op == math_ops.add:602    output_tensors = nccl.all_sum(input_tensors)603  else:604    raise ValueError("red_op not supported by NCCL all-reduce: ", red_op)605  if un_op:606    un_op_wrapped = []607    for t in output_tensors:608      with ops.colocate_with(t):609        un_op_wrapped.append(un_op(t))610    output_tensors = un_op_wrapped611  return output_tensors612def _build_nccl_hybrid(input_tensors, red_op, upper_level_f):613  """Construct a subgraph for NCCL hybrid all-reduce.614  Args:615    input_tensors: list of T @{tf.Tensor} of same-shape and type values to616      be reduced.617    red_op: binary elementwise reduction operator.618    upper_level_f: function for reducing one value per worker, across619      workers.620  Returns:621    list of T @{tf.Tensor} of reduced values.622  Raises:623    ValueError: inputs not well-formed.624  """625  input_tensors, shape = _flatten_tensors(input_tensors)626  devices = [t.device for t in input_tensors]627  per_worker_devices, per_worker_values = _split_by_task(devices, input_tensors)628  num_workers = len(per_worker_devices)629  up_values = [None for w in range(0, num_workers)]630  up_devices = up_values[:]631  down_values = up_values[:]632  # First stage: reduce within each worker using NCCL633  for w in range(0, num_workers):634    worker_values = build_nccl_all_reduce(per_worker_values[w], red_op)635    # NOTE: these reductions will not run to completion unless636    # every output value is used.  Since we only need one, we637    # need to put control dependencies on the rest.638    with ops.control_dependencies(worker_values):639      with ops.device(worker_values[0].device):640        up_values[w] = array_ops.identity(worker_values[0])641      up_devices[w] = per_worker_devices[w][0]642  # Second stage: Apply upper_level_f to reduce across first device at643  # each worker644  level_2_output = upper_level_f(up_values)645  # Third stage: propagate within each worker using NCCL Broadcast646  for w in range(0, num_workers):647    dst_tensors = []648    with ops.device(per_worker_devices[w][0]):649      broadcast_src = nccl.broadcast(array_ops.identity(level_2_output[w]))650    for d in per_worker_devices[w]:651      with ops.device(d):652        dst_tensors.append(array_ops.identity(broadcast_src))653    down_values[w] = dst_tensors654  output_tensors = [v for sublist in down_values for v in sublist]655  if len(shape) != 1:656    output_tensors = _reshape_tensors(output_tensors, shape)657  return output_tensors658def _reduce_non_singleton(input_tensors, red_f, un_op):659  """If input_tensors has more than one element apply red_f, else apply un_op."""660  if len(input_tensors) > 1:661    return red_f(input_tensors)662  else:663    if not un_op:664      return input_tensors665    output_tensors = []666    for t in input_tensors:667      with ops.colocate_with(t):668        output_tensors.append(un_op(t))669    return output_tensors670def build_nccl_then_ring(input_tensors, subdiv, red_op, un_op=None):671  """Construct hybrid of NCCL within workers, Ring across workers."""672  def upper_builder(y):673    return build_ring_all_reduce(y, len(y), subdiv, [0], red_op, un_op)674  def upper_level_f(x):675    return _reduce_non_singleton(x, upper_builder, un_op)676  return _build_nccl_hybrid(input_tensors, red_op, upper_level_f)677def build_nccl_then_recursive_hd(input_tensors, red_op, un_op=None):678  """Construct hybrid of NCCL within workers, Recursive-HD across workers."""679  upper_level_f = lambda x: build_recursive_hd_all_reduce(x, red_op, un_op)680  return _build_nccl_hybrid(input_tensors, red_op, upper_level_f)681def build_nccl_then_shuffle(input_tensors, gather_devices, nccl_red_op,682                            shuffle_red_op, un_op=None):683  """Construct hybrid of NCCL within workers, Shuffle across workers."""684  upper_level_f = lambda x: build_shuffle_all_reduce(x, gather_devices,685                                                     shuffle_red_op, un_op)686  return _build_nccl_hybrid(input_tensors, nccl_red_op, upper_level_f)687def _build_shuffle_hybrid(input_tensors, gather_devices, red_op, upper_level_f):688  """Construct a subgraph for Shuffle hybrid all-reduce.689  Args:690    input_tensors: list of T @{tf.Tensor} of same-shape and type values to691      be reduced.692    gather_devices: list of device names on which to host gather shards.693    red_op: binary elementwise reduction operator.694    upper_level_f: function for reducing one value per worker, across695      workers.696  Returns:697    list of T @{tf.Tensor} of reduced values.698  Raises:699    ValueError: inputs not well-formed.700  """701  input_tensors, shape = _flatten_tensors(input_tensors)702  # First stage, reduce across each worker using gather_devices.703  devices = [t.device for t in input_tensors]704  per_worker_devices, per_worker_values = _split_by_task(devices, input_tensors)705  num_workers = len(per_worker_devices)706  up_values = []707  if len(gather_devices) != num_workers:708    raise ValueError("For shuffle hybrid, gather_devices must contain one "709                     "device per worker. ")710  for w in range(0, num_workers):711    reduced_shards = _build_shuffle_gather(712        per_worker_values[w], [gather_devices[w]], red_op)713    up_values.append(reduced_shards[0])714  # Second stage, apply upper_level_f.715  level_2_output = upper_level_f(up_values)716  # Third stage, apply shuffle scatter at each worker.717  output_tensors = []718  for w in range(0, num_workers):719    output_tensors += _build_shuffle_scatter(720        [level_2_output[w]], per_worker_devices[w])721  if len(shape) != 1:722    output_tensors = _reshape_tensors(output_tensors, shape)723  return output_tensors724def build_shuffle_then_ring(input_tensors, gather_devices, subdiv,725                            red_n_op, red_op, un_op=None):726  """Construct hybrid of Shuffle within workers, Ring across workers."""727  def upper_builder(tensors):728    return build_ring_all_reduce(tensors, len(tensors), subdiv, [0],729                                 red_op, un_op)730  def upper_level_f(tensors):731    return _reduce_non_singleton(tensors, upper_builder, un_op)732  return _build_shuffle_hybrid(733      input_tensors, gather_devices, red_n_op, upper_level_f)734def build_shuffle_then_shuffle(input_tensors, first_gather_devices,735                               second_gather_devices, red_op, un_op=None):736  """Construct hybrid of Shuffle within workers, Shuffle across workers."""737  def upper_builder(tensors):738    return build_shuffle_all_reduce(tensors, second_gather_devices,739                                    red_op, un_op)740  def upper_level_f(tensors):741    return _reduce_non_singleton(tensors, upper_builder, un_op)742  return _build_shuffle_hybrid(...app.py
Source:app.py  
1import paho.mqtt.client as mqtt2from flask import Flask,g,render_template,request,Response3from flask_cors import CORS4from flask_socketio import SocketIO, emit5from flask_apscheduler import APScheduler6from libDB import logDB,clearOldDB,queryDB7import numpy as np8import json9from libPickle import *10from pyimagesearch.motion_detection import SingleMotionDetector11from imutils.video import VideoStream12import threading13import argparse14import datetime15import imutils16import schedule17import time18import cv219#edit by chrome20"""21Date storage22- hot    : pickle devices data23- cold   :24- frozen : sqlite325""" 26class Config(object):27    SCHEDULER_API_ENABLED = True28    SECRET_KEY = 'secret!'29    threaded=True30    31scheduler = APScheduler()32# initialize the output frame and a lock used to ensure thread-safe33# exchanges of the output frames (useful for multiple browsers/tabs34# are viewing tthe stream)35outputFrame = None36lock = threading.Lock()37app = Flask(__name__)38app.config.from_object(Config())39CORS(app)40socketio = SocketIO(app)41# initialize the video stream and allow the camera sensor to42# warmup43#vs = VideoStream(usePiCamera=1).start()44vs = VideoStream(src=0).start()45# Create a dictionary called devices to store the device number, name, and device state:46devices = {47    'one' : {'floor' : 'fl1','position' : 'garage','object' : 'door', 'cmd' : '01', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl1/garage/door/01', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},48    'two' : {'floor' : 'fl1','position' : 'garage','object' : 'door', 'cmd' : '02', 'state' : 'False', 'type' : 'latch', 'topic' : 'fl1/garage/door/02', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},49    'three' : {'floor' : 'fl1','position' : 'com','object' : 'light', 'cmd' : '01', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl1/com/light/01', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},50    'four' : {'floor' : 'fl1','position' : 'com','object' : 'light', 'cmd' : '02', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl1/com/light/02', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},51    'five' : {'floor' : 'fl2','position' : 'liv','object' : 'fan', 'cmd' : '01', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl2/liv/fan/01', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},52    'six' : {'floor' : 'fl2','position' : 'cen','object' : 'light', 'cmd' : '01', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl2/cen/light/01', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},53    'seven' : {'floor' : 'fl3','position' : 'bed','object' : 'fan', 'cmd' : '01', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl3/bed/fan/01', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'}54    }55try:56    devices = load_pickle_obj('devices')57    # print(devices)58    print("devices exist")59except:60    save_pickle_obj(devices,'devices')61sensors = {62    'one' : {'name' : 'sensorName1'}63    }64jobCron = { 'startHour':'14',65            'startMinute':'00',66            'stopHour':'14',67            'stopMinute':'30'68    }69try:70    jobCron = load_pickle_obj('jobCron')71    # print(jobCron)72    print("jobCron exist")73except:74    save_pickle_obj(jobCron,'jobCron')75queryDatas = {'temp':{},76            'timeStamp':{},77            'humid':{}}78# Put the device dictionary into the template data dictionary:79templateData = {80    'devices' : devices,81    'sensors' : sensors,82    'jobCron': jobCron,83    'queryDatas':queryDatas84    }85@scheduler.task('cron', id='do_job_on', hour=jobCron['startHour'], minute=jobCron['startMinute'])86def jobOn():87    print('Job on executed')88    for device in devices:89        if devices[device]['type'] == 'on/off':90            if devices[device]['time_enable'] == 'True':91                devices[device]['state'] = 'True'92                mqtt_client.publish(devices[device]['topic'],"on")93            print(device+' '+devices[device]['time_enable'])94    try:95        save_pickle_obj(devices,'devices')96    except:97        print("error")98@scheduler.task('cron', id='do_job_off', hour=jobCron['stopHour'], minute=jobCron['stopMinute'])99def jobOff():100    print('Job off executed')101    for device in devices:102        if devices[device]['type'] == 'on/off':103            if devices[device]['time_enable'] == 'True':104                devices[device]['state'] = 'False'105                mqtt_client.publish(devices[device]['topic'],"off")106            print(device+' '+devices[device]['time_enable'])107    try:108        save_pickle_obj(devices,'devices')109    except:110        print("error")111@scheduler.task('cron', id='do_job_DB', minute='*')112def jobDB():113    print('Job DB executed')114    for device in devices:115        if devices[device]['th_enable'] == 'True':116            logDB(devices[device]['topic'],devices[device]['temp'],devices[device]['humid'],"DHT","true")117            try:118                save_pickle_obj(devices,'devices')119            except:120                print("error")121        122@scheduler.task('cron', id='do_job_ClrDB', day='*')123def jobClrDB():124    print('Job ClrDB executed')125    clearOldDB()126scheduler.init_app(app)127scheduler.start()128# The callback for when the client receives a CONNACK response from the server.129def on_connect(client, userdata, flags, rc):130    print("Connected with result code "+str(rc))131    # Subscribing in on_connect() means that if we lose the connection and132    # reconnect then subscriptions will be renewed.133    client.subscribe("fl1/#")#  + single-level wild card, # multi-level wild card, 134    client.subscribe("fl2/#")135    client.subscribe("fl3/#")136    client.subscribe("#")137    138def whx(ownval,owndicts):139    for x in owndicts:140        if (ownval) in owndicts[x].values():141            return x142    return 'none'143# The callback for when a PUBLISH message is received from the ESP8266.144def on_message(client, userdata, message):145    # socketio.emit('my variable')146    print("Received message '" + str(message.payload.decode('utf8')) + "' on topic '"+ message.topic + "' with QoS " + str(message.qos))147    if message.topic.startswith('fl'):148        topicMsgSplit = message.topic.split("/")149        slash_count = message.topic.count("/")150        if slash_count>=3:151            topicMsg = topicMsgSplit[0]+"/"+topicMsgSplit[1]+"/"+topicMsgSplit[2]+"/"+topicMsgSplit[3]152            indexName = whx(topicMsg,devices)153            devices_list = list(devices)154            n_index = devices_list.index(indexName)155            if "/temperature" in message.topic:156                devices[indexName]['th_enable'] = 'True'157                devices[indexName]['temp'] = str(message.payload.decode('utf8'))158                socketio.emit('temp' , {'data': devices[indexName]['temp'],'pos':n_index})159                #print("temperature update")160            if "/humidity" in message.topic:161                devices[indexName]['th_enable'] = 'True'162                devices[indexName]['humid'] = str(message.payload.decode('utf8'))163                socketio.emit('humid' , {'data': devices[indexName]['humid'],'pos':n_index})164                # print("humidity update")            165            if "/feedback" in message.topic:166                if "on" in str(message.payload.decode('utf8')):167                    if devices[indexName]['state'] == 'False':168                        devices[indexName]['state'] = 'True'169                        socketio.emit('refresh' , {})170                        # print("refresh")171                    172                else:173                    if devices[indexName]['state'] == 'True':174                        devices[indexName]['state'] = 'False'175                        socketio.emit('refresh' , {})176                        # print("refresh")177                # print("feedback update")178        else:179            print("mqtt message.topic error")180    elif message.topic.startswith('tele'):181        #print("tasmota")182        topicMsgSplit = message.topic.split("/")#[0]:tele,[1]:name,[2]:classify183        #topicMsg = topicMsgSplit[0]+"/"+topicMsgSplit[1]+"/"+topicMsgSplit[2]184        if topicMsgSplit[2] == "SENSOR":185            #print(topicMsgSplit[1])186            topic_json=json.dumps(str(message.payload.decode('utf8')))187            print(topic_json["ENERGY"])188            189    else:190        print("unknown topic: "+message.topic)191        192    try:193        save_pickle_obj(devices,'devices')194    except:195        print("error")196    197mqtt_client = mqtt.Client()198mqtt_client.on_connect = on_connect199mqtt_client.on_message = on_message200mqtt_client.connect("127.0.0.1",1883,60)201# mqtt_client.connect("192.168.2.46",1883,60)202mqtt_client.loop_start()203@app.route("/",methods=['GET', 'POST'])204def main():205    if request.method == 'GET':206        queryDatas['temp'] = queryDB("temp")207        queryDatas['humid'] = queryDB("humid")208        queryDatas['timeStamp'] = queryDB("timeStamp")209        # print(queryDatas['timeStamp'])210        templateData = {211            'devices' : devices,212            'sensors' : sensors,213            'jobCron': jobCron,214            'queryDatas':queryDatas215            }216        try:217            save_pickle_obj(devices,'devices')218        except:219            print("error")220        # Pass the template data into the template main.html and return it to the user221        return render_template('main.html', async_mode=socketio.async_mode, **templateData)222    elif request.method == 'POST':223        return 'post method do nothing'224    else:225        return 'method error'226@app.route("/debug")227def debug():228    templateData = {229        'devices' : devices,230        'sensors' : sensors,231        'jobCron': jobCron,232        'queryDatas':queryDatas233        }234    # Pass the template data into the template main.html and return it to the user235    return render_template('debug.html', async_mode=socketio.async_mode, **templateData)236# The function below is executed when someone requests a URL with the device number and action in it:237@app.route("/<device>/<floor>/<position>/<object>/<cmd>/<ctrl>",methods = ['POST', 'GET'])238def action(device, floor, position, object, cmd, ctrl):239    # Convert the device from the URL into an integer:240    #function = function#int(function)241    # Get the device name for the device being changed:242    # deviceName = devices[function]['name']243    # If the action part of the URL is "1" execute the code indented below:244    if ctrl == "on":245        mqtt_client.publish(devices[device]['topic'],"on")246        #print('mp '+devices[device]['topic'])247        devices[device]['state'] = 'True'248    # if ctrl == "0" and object == 'door':249    if ctrl == "off":250        mqtt_client.publish(devices[device]['topic'],"off")251        #print('mp '+devices[device]['topic'])252        devices[device]['state'] = 'False'253    if ctrl == "toggle":254        if devices[device]['state'] == 'True':255            mqtt_client.publish(devices[device]['topic'],"off")256            #print('mp '+devices[device]['topic'])257            devices[device]['state'] = 'False'258        else:   259            mqtt_client.publish(devices[device]['topic'],"on")260            #print('mp '+devices[device]['topic'])261            devices[device]['state'] = 'True'262    if ctrl == "click":263        mqtt_client.publish(devices[device]['topic'],"click")264        print('click '+devices[device]['topic'])265        devices[device]['state'] = 'False'266        267    # Along with the device dictionary, put the message into the template data dictionary:268    templateData = {269        'devices' : devices,270        'sensors' : sensors,271        'jobCron': jobCron,272        'queryDatas':queryDatas273    }274    try:275        save_pickle_obj(devices,'devices')276    except:277        print("error")278    return render_template('main.html', **templateData)279@app.route("/addSched/<device>")280def addSched(device):281    print('time_enable '+devices[device]['time_enable'])282    devices[device]['time_enable'] = 'True'283    templateData = {284        'devices' : devices,285        'sensors' : sensors,286        'jobCron': jobCron,287        'queryDatas':queryDatas288    }289    try:290        save_pickle_obj(devices,'devices')291    except:292        print("error")293    return render_template('main.html', **templateData)294    295    296@app.route("/rmSched/<device>")297def rmSched(device):298    print('time_enable '+devices[device]['time_enable'])299    devices[device]['time_enable'] = 'False'300    templateData = {301        'devices' : devices,302        'sensors' : sensors,303        'jobCron': jobCron,304        'queryDatas':queryDatas305    }306    try:307        save_pickle_obj(devices,'devices')308    except:309        print("error")310    return render_template('main.html', **templateData)311@app.route('/startTime',methods = ['POST', 'GET'])312def startTime():313    if request.method == 'POST':314        print(request.form)315        result1 = str(request.form['startTime1'])316        result2 = str(request.form['startTime2'])317        for job in scheduler.get_jobs():318            print(job.id)319        320        try:321            scheduler.scheduler.reschedule_job('do_job_on', trigger='cron', hour=result1, minute=result2)322            jobCron['startHour']=result1323            jobCron['startMinute']=result2324        except:325            pass326    templateData = {327        'devices' : devices,328        'sensors' : sensors,329        'jobCron': jobCron,330        'queryDatas':queryDatas331    }332    try:333        save_pickle_obj(devices,'devices')334        save_pickle_obj(jobCron,'jobCron')335    except:336        print("error")337    return render_template('main.html', **templateData)338        339@app.route('/stopTime',methods = ['POST', 'GET'])340def stopTime():341    if request.method == 'POST':342        result1 = str(request.form['stopTime1'])343        result2 = str(request.form['stopTime2'])344        for job in scheduler.get_jobs():345            print(job.id)346                347        try:348            scheduler.scheduler.reschedule_job('do_job_off', trigger='cron', hour=result1, minute=result2)349            jobCron['stopHour']=result1350            jobCron['stopMinute']=result2351        except:352            pass353        354    templateData = {355        'devices' : devices,356        'sensors' : sensors,357        'jobCron': jobCron,358        'queryDatas':queryDatas359    }360    try:361        save_pickle_obj(devices,'devices')362        save_pickle_obj(jobCron,'jobCron')363    except:364        print("error")365    return render_template('main.html', **templateData)366@app.route("/videoStream")367def videoStream():368    # return the rendered template369    return render_template("videoStream.html")370def detect_motion(frameCount):371    # grab global references to the video stream, output frame, and372    # lock variables373    global vs, outputFrame, lock374    # initialize the motion detector and the total number of frames375    # read thus far376    md = SingleMotionDetector(accumWeight=0.1)377    total = 0378    # loop over frames from the video stream379    while True:380        # read the next frame from the video stream, resize it,381        # convert the frame to grayscale, and blur it382        frame = vs.read()383        frame = imutils.resize(frame, width=400)384        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)385        gray = cv2.GaussianBlur(gray, (7, 7), 0)386        # grab the current timestamp and draw it on the frame387        timestamp = datetime.datetime.now()388        cv2.putText(frame, timestamp.strftime("%A %d %B %Y %I:%M:%S%p"), 389            (10, frame.shape[0] - 10),cv2.FONT_HERSHEY_SIMPLEX, 0.35, (0, 0, 255), 1)390        # if the total number of frames has reached a sufficient391        # number to construct a reasonable background model, then392        # continue to process the frame393        if total > frameCount:394            # detect motion in the image395            motion = md.detect(gray)396            # cehck to see if motion was found in the frame397            if motion is not None:398                # unpack the tuple and draw the box surrounding the399                # "motion area" on the output frame400                (thresh, (minX, minY, maxX, maxY)) = motion401                cv2.rectangle(frame, (minX, minY), (maxX, maxY),(0, 0, 255), 2)402        403        # update the background model and increment the total number404        # of frames read thus far405        md.update(gray)406        total += 1407        # acquire the lock, set the output frame, and release the408        # lock409        with lock:410            outputFrame = frame.copy()411            # outputFrame = gray.copy()412            413        414def generate():415    # grab global references to the output frame and lock variables416    global outputFrame, lock417    # loop over frames from the output stream418    while True:419        # wait until the lock is acquired420        with lock:421            # check if the output frame is available, otherwise skip422            # the iteration of the loop423            if outputFrame is None:424                continue425            # encode the frame in JPEG format426            (flag, encodedImage) = cv2.imencode(".jpg", outputFrame)427            # ensure the frame was successfully encoded428            if not flag:429                continue430        # yield the output frame in the byte format431        yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + bytearray(encodedImage) + b'\r\n')432@app.route("/video_feed")433def video_feed():434    # return the response generated along with the specific media435    # type (mime type)436    return Response(generate(),mimetype = "multipart/x-mixed-replace; boundary=frame")437        438@socketio.on('my event')439def handle_my_custom_event(json):440    #print('received json data here: ' + str(json))441    pass442# function for responses443def results():444    # build a request object445    req = request.get_json(silent=True, force=True)446    print("Request:")447    print(json.dumps(req, indent=4))448    # print(req,flush=True)449    # fetch fulfillmentText from json450    fulfillmentText = req.get('queryResult').get('fulfillmentText')451    # print(req,flush=True)452    # print(fulfillmentText, flush=True)453    mqtt_client.publish("fl1/garage/door/02","slide")454    # return a fulfillment response455    #return {'fulfillmentText': 'This is a response from webhook.'}456    return 'results'457@app.route('/webhook',methods=['GET','POST'])458def webhook():459    if request.method == 'POST':460        return results()461    else:462        return 'Hello'463if __name__ == "__main__":464    t = threading.Thread(target=detect_motion, args=(64,))465    t.daemon = True466    t.start()467    socketio.run(app, host='0.0.0.0', port=80, debug=True, use_reloader=False)468    469# release the video stream pointer...path.py
Source:path.py  
...158        List of devices ordered by coordinates159        """160        return sorted(self.devices, key=lambda dev: dev.md.z)161    @property162    def blocking_devices(self):163        """164        A list of devices that are currently inserted or are in unknown165        positions. This includes devices downstream of the first166        :attr:`.impediment`167        """168        # Cache important prior devices169        prior = None170        last_branches = list()171        block = list()172        for device in self.path:173            # If we have switched beamlines174            if prior and device.md.beamline != prior.md.beamline:175                # Find improperly configured optics176                for optic in last_branches:177                    # If this optic is responsible for delivering beam178                    # to this hutch and it is not configured to do so.179                    # Mark it as blocking180                    if device.md.beamline in optic.branches:181                        if device.md.beamline not in optic.destination:182                            block.append(optic)183                    # Otherwise ensure it is removed from the beamline184                    elif optic.md.beamline not in optic.destination:185                        block.append(optic)186                # Clear optics that have been evaluated187                last_branches.clear()188            # If our last device was an optic, make sure it wasn't required189            # to continue along this beampath190            elif (prior in last_branches191                    and device.md.beamline in prior.branches192                    and device.md.beamline not in prior.destination):193                block.append(last_branches.pop(-1))194            # Find branching devices and store195            # They will be marked as blocking by downstream devices196            dev_state = find_device_state(device)197            if device in self.branches:198                last_branches.append(device)199            # Find inserted devices200            elif dev_state == DeviceState.Inserted:201                # Ignore devices with low enough transmssion202                trans = getattr(device, 'transmission', 1)203                if trans < self.minimum_transmission:204                    block.append(device)205            # Find unknown and faulted devices206            elif dev_state != DeviceState.Removed:207                block.append(device)208            # Stache our prior device209            prior = device210        return block211    @property212    def incident_devices(self):213        """214        A list of devices the beam is currently incident on. This includes the215        current :attr:`.impediment` and any upstream devices that may be216        inserted but have more transmission than :attr:`.minimum_transmission`217        """218        # Find device information219        inserted = [d for d in self.path220                    if find_device_state(d) == DeviceState.Inserted]221        impediment = self.impediment222        # No blocking devices, all inserted devices incident223        if not impediment:224            return inserted225        # Otherwise only return upstream of the impediment226        return [d for d in inserted if d.md.z <= impediment.md.z]227    def show_devices(self, file=None):228        """229        Print a table of the devices along the beamline230        Parameters231        ----------232        file : file-like object233            File to writable234        """235        # Initialize Table236        pt = PrettyTable(['Name', 'Prefix', 'Position', 'Beamline', 'State'])237        # Adjust Table settings238        pt.align = 'r'239        pt.align['Name'] = 'l'240        pt.align['Prefix'] = 'l'241        pt.float_format = '8.5'...vscsi_util.py
Source:vscsi_util.py  
...136                        devname = sg137                    scsi_id = _vscsi_get_scsiid(sg)138            devices.append([hctl, devname, sg, scsi_id])139    return devices140def vscsi_get_scsidevices(mask="*"):141    """ get all scsi devices information """142    devices = _vscsi_get_scsidevices_by_lsscsi("[%s]" % mask)143    if devices or (len(mask) and mask[0] != "*"):144        # devices found or partial device scan145        return devices146    return _vscsi_get_scsidevices_by_sysfs()147def vscsi_get_hctl_and_devname_by(target, scsi_devices = None):148    if target.startswith('/dev/'):149        target = os.path.realpath(target)150    if scsi_devices is None:151        if len(target.split(':')) == 4:152            scsi_devices = _vscsi_get_scsidevices_by_lsscsi(target)153        elif target.startswith('/dev/'): 154            scsi_devices = _vscsi_get_scsidevices_by_lsscsi("| grep %s" % target)155        else:156            scsi_devices = _vscsi_get_scsidevices_by_lsscsi("")157        if not scsi_devices:158            scsi_devices = _vscsi_get_scsidevices_by_sysfs()159    if len(target.split(':')) == 4:160        return _vscsi_get_devname_by(target, scsi_devices)161    else:162        return _vscsi_get_hctl_by(target, scsi_devices)163def get_scsi_vendor(pHCTL):164    try:165        sysfs_mnt = utils.find_sysfs_mount() 166        sysfs_scsi_dev_path = \167            os.path.join(sysfs_mnt + SYSFS_SCSI_PATH, pHCTL)168        scsi_vendor = \169            os.popen('cat ' + sysfs_scsi_dev_path + \170                              SYSFS_SCSI_DEV_VENDOR_PATH).read()171        return scsi_vendor.splitlines()[0]172    except:173        return None174def get_scsi_model(pHCTL):175    try:176        sysfs_mnt = utils.find_sysfs_mount() 177        sysfs_scsi_dev_path = \178            os.path.join(sysfs_mnt + SYSFS_SCSI_PATH, pHCTL)179        scsi_model = \180            os.popen('cat ' + sysfs_scsi_dev_path + \181                              SYSFS_SCSI_DEV_MODEL_PATH).read()182        return scsi_model.splitlines()[0]183    except:184        return None185def get_scsi_typeid(pHCTL):186    try:187        sysfs_mnt = utils.find_sysfs_mount() 188        sysfs_scsi_dev_path = \189            os.path.join(sysfs_mnt + SYSFS_SCSI_PATH, pHCTL)190        scsi_typeid = \191            os.popen('cat ' + sysfs_scsi_dev_path + \192                              SYSFS_SCSI_DEV_TYPEID_PATH).read()193        return int(scsi_typeid.splitlines()[0])194    except:195        return None196def get_scsi_revision(pHCTL):197    try:198        sysfs_mnt = utils.find_sysfs_mount() 199        sysfs_scsi_dev_path = \200            os.path.join(sysfs_mnt + SYSFS_SCSI_PATH, pHCTL)201        scsi_revision = \202            os.popen('cat ' + sysfs_scsi_dev_path + \203                              SYSFS_SCSI_DEV_REVISION_PATH).read()204        return scsi_revision.splitlines()[0]205    except:206        return None207def get_scsi_scsilevel(pHCTL):208    try:209        sysfs_mnt = utils.find_sysfs_mount() 210        sysfs_scsi_dev_path = \211            os.path.join(sysfs_mnt + SYSFS_SCSI_PATH, pHCTL)212        scsi_scsilevel = \213            os.popen('cat ' + sysfs_scsi_dev_path + \214                              SYSFS_SCSI_DEV_SCSILEVEL_PATH).read()215        return int(scsi_scsilevel.splitlines()[0])216    except:217        return None218def _make_scsi_record(scsi_info):219    scsi_rec = {220        'physical_HCTL': scsi_info[0],221        'dev_name': None,222        'sg_name': scsi_info[2],223        'scsi_id': None224    }225    if scsi_info[1] is not None:226        scsi_rec['dev_name'] = scsi_info[1] 227    if scsi_info[3] is not None:228        scsi_rec['scsi_id'] = scsi_info[3] 229    scsi_rec['vendor_name'] = \230        get_scsi_vendor(scsi_rec['physical_HCTL'])231    scsi_rec['model'] = \232        get_scsi_model(scsi_rec['physical_HCTL'])233    scsi_rec['type_id'] = \234        get_scsi_typeid(scsi_rec['physical_HCTL'])235    scsi_rec['revision'] = \236        get_scsi_revision(scsi_rec['physical_HCTL'])237    scsi_rec['scsi_level'] = \238        get_scsi_scsilevel(scsi_rec['physical_HCTL'])239    try:240        lsscsi_info = os.popen('lsscsi %s 2>/dev/null' % scsi_rec['physical_HCTL']).read().split()241        scsi_rec['type'] = lsscsi_info[1]242    except:243        scsi_rec['type'] = None244    return scsi_rec245def get_scsi_device(pHCTL):246    scsis_info = _vscsi_get_scsidevices_by_lsscsi(pHCTL)247    if not scsis_info:248        scsis_info = _vscsi_get_scsidevices_by_sysfs()249    for scsi_info in scsis_info:250        if scsi_info[0] == pHCTL:251            return _make_scsi_record(scsi_info)252    return None253def get_all_scsi_devices(mask="*"):254    scsi_records = []255    for scsi_info in vscsi_get_scsidevices(mask):256        scsi_record = _make_scsi_record(scsi_info)257        scsi_records.append(scsi_record)...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
