How to use _preConnect method in fMBT

Best Python code snippet using fMBT_python

git_config.py

Source:git_config.py Github

copy

Full Screen

1#2# Copyright (C) 2008 The Android Open Source Project3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15from __future__ import print_function16import contextlib17import errno18import json19import os20import re21import subprocess22import sys23try:24 import threading as _threading25except ImportError:26 import dummy_threading as _threading27import time28from pyversion import is_python329if is_python3():30 import urllib.request31 import urllib.error32else:33 import urllib234 import imp35 urllib = imp.new_module('urllib')36 urllib.request = urllib237 urllib.error = urllib238from signal import SIGTERM39from error import GitError, UploadError40from trace import Trace41if is_python3():42 from http.client import HTTPException43else:44 from httplib import HTTPException45from git_command import GitCommand46from git_command import ssh_sock47from git_command import terminate_ssh_clients48R_HEADS = 'refs/heads/'49R_TAGS = 'refs/tags/'50ID_RE = re.compile(r'^[0-9a-f]{40}$')51REVIEW_CACHE = dict()52def IsId(rev):53 return ID_RE.match(rev)54def _key(name):55 parts = name.split('.')56 if len(parts) < 2:57 return name.lower()58 parts[ 0] = parts[ 0].lower()59 parts[-1] = parts[-1].lower()60 return '.'.join(parts)61class GitConfig(object):62 _ForUser = None63 @classmethod64 def ForUser(cls):65 if cls._ForUser is None:66 cls._ForUser = cls(configfile = os.path.expanduser('~/.gitconfig'))67 return cls._ForUser68 @classmethod69 def ForRepository(cls, gitdir, defaults=None):70 return cls(configfile = os.path.join(gitdir, 'config'),71 defaults = defaults)72 def __init__(self, configfile, defaults=None, jsonFile=None):73 self.file = configfile74 self.defaults = defaults75 self._cache_dict = None76 self._section_dict = None77 self._remotes = {}78 self._branches = {}79 self._json = jsonFile80 if self._json is None:81 self._json = os.path.join(82 os.path.dirname(self.file),83 '.repo_' + os.path.basename(self.file) + '.json')84 def Has(self, name, include_defaults = True):85 """Return true if this configuration file has the key.86 """87 if _key(name) in self._cache:88 return True89 if include_defaults and self.defaults:90 return self.defaults.Has(name, include_defaults = True)91 return False92 def GetBoolean(self, name):93 """Returns a boolean from the configuration file.94 None : The value was not defined, or is not a boolean.95 True : The value was set to true or yes.96 False: The value was set to false or no.97 """98 v = self.GetString(name)99 if v is None:100 return None101 v = v.lower()102 if v in ('true', 'yes'):103 return True104 if v in ('false', 'no'):105 return False106 return None107 def GetString(self, name, all_keys=False):108 """Get the first value for a key, or None if it is not defined.109 This configuration file is used first, if the key is not110 defined or all_keys = True then the defaults are also searched.111 """112 try:113 v = self._cache[_key(name)]114 except KeyError:115 if self.defaults:116 return self.defaults.GetString(name, all_keys = all_keys)117 v = []118 if not all_keys:119 if v:120 return v[0]121 return None122 r = []123 r.extend(v)124 if self.defaults:125 r.extend(self.defaults.GetString(name, all_keys = True))126 return r127 def SetString(self, name, value):128 """Set the value(s) for a key.129 Only this configuration file is modified.130 The supplied value should be either a string,131 or a list of strings (to store multiple values).132 """133 key = _key(name)134 try:135 old = self._cache[key]136 except KeyError:137 old = []138 if value is None:139 if old:140 del self._cache[key]141 self._do('--unset-all', name)142 elif isinstance(value, list):143 if len(value) == 0:144 self.SetString(name, None)145 elif len(value) == 1:146 self.SetString(name, value[0])147 elif old != value:148 self._cache[key] = list(value)149 self._do('--replace-all', name, value[0])150 for i in range(1, len(value)):151 self._do('--add', name, value[i])152 elif len(old) != 1 or old[0] != value:153 self._cache[key] = [value]154 self._do('--replace-all', name, value)155 def GetRemote(self, name):156 """Get the remote.$name.* configuration values as an object.157 """158 try:159 r = self._remotes[name]160 except KeyError:161 r = Remote(self, name)162 self._remotes[r.name] = r163 return r164 def GetBranch(self, name):165 """Get the branch.$name.* configuration values as an object.166 """167 try:168 b = self._branches[name]169 except KeyError:170 b = Branch(self, name)171 self._branches[b.name] = b172 return b173 def GetSubSections(self, section):174 """List all subsection names matching $section.*.*175 """176 return self._sections.get(section, set())177 def HasSection(self, section, subsection = ''):178 """Does at least one key in section.subsection exist?179 """180 try:181 return subsection in self._sections[section]182 except KeyError:183 return False184 def UrlInsteadOf(self, url):185 """Resolve any url.*.insteadof references.186 """187 for new_url in self.GetSubSections('url'):188 for old_url in self.GetString('url.%s.insteadof' % new_url, True):189 if old_url is not None and url.startswith(old_url):190 return new_url + url[len(old_url):]191 return url192 @property193 def _sections(self):194 d = self._section_dict195 if d is None:196 d = {}197 for name in self._cache.keys():198 p = name.split('.')199 if 2 == len(p):200 section = p[0]201 subsect = ''202 else:203 section = p[0]204 subsect = '.'.join(p[1:-1])205 if section not in d:206 d[section] = set()207 d[section].add(subsect)208 self._section_dict = d209 return d210 @property211 def _cache(self):212 if self._cache_dict is None:213 self._cache_dict = self._Read()214 return self._cache_dict215 def _Read(self):216 d = self._ReadJson()217 if d is None:218 d = self._ReadGit()219 self._SaveJson(d)220 return d221 def _ReadJson(self):222 try:223 if os.path.getmtime(self._json) \224 <= os.path.getmtime(self.file):225 os.remove(self._json)226 return None227 except OSError:228 return None229 try:230 Trace(': parsing %s', self.file)231 fd = open(self._json)232 try:233 return json.load(fd)234 finally:235 fd.close()236 except (IOError, ValueError):237 os.remove(self._json)238 return None239 def _SaveJson(self, cache):240 try:241 fd = open(self._json, 'w')242 try:243 json.dump(cache, fd, indent=2)244 finally:245 fd.close()246 except (IOError, TypeError):247 if os.path.exists(self._json):248 os.remove(self._json)249 def _ReadGit(self):250 """251 Read configuration data from git.252 This internal method populates the GitConfig cache.253 """254 c = {}255 d = self._do('--null', '--list')256 if d is None:257 return c258 for line in d.decode('utf-8').rstrip('\0').split('\0'): # pylint: disable=W1401259 # Backslash is not anomalous260 if '\n' in line:261 key, val = line.split('\n', 1)262 else:263 key = line264 val = None265 if key in c:266 c[key].append(val)267 else:268 c[key] = [val]269 return c270 def _do(self, *args):271 command = ['config', '--file', self.file]272 command.extend(args)273 p = GitCommand(None,274 command,275 capture_stdout = True,276 capture_stderr = True)277 if p.Wait() == 0:278 return p.stdout279 else:280 GitError('git config %s: %s' % (str(args), p.stderr))281class RefSpec(object):282 """A Git refspec line, split into its components:283 forced: True if the line starts with '+'284 src: Left side of the line285 dst: Right side of the line286 """287 @classmethod288 def FromString(cls, rs):289 lhs, rhs = rs.split(':', 2)290 if lhs.startswith('+'):291 lhs = lhs[1:]292 forced = True293 else:294 forced = False295 return cls(forced, lhs, rhs)296 def __init__(self, forced, lhs, rhs):297 self.forced = forced298 self.src = lhs299 self.dst = rhs300 def SourceMatches(self, rev):301 if self.src:302 if rev == self.src:303 return True304 if self.src.endswith('/*') and rev.startswith(self.src[:-1]):305 return True306 return False307 def DestMatches(self, ref):308 if self.dst:309 if ref == self.dst:310 return True311 if self.dst.endswith('/*') and ref.startswith(self.dst[:-1]):312 return True313 return False314 def MapSource(self, rev):315 if self.src.endswith('/*'):316 return self.dst[:-1] + rev[len(self.src) - 1:]317 return self.dst318 def __str__(self):319 s = ''320 if self.forced:321 s += '+'322 if self.src:323 s += self.src324 if self.dst:325 s += ':'326 s += self.dst327 return s328_master_processes = []329_master_keys = set()330_ssh_master = True331_master_keys_lock = None332def init_ssh():333 """Should be called once at the start of repo to init ssh master handling.334 At the moment, all we do is to create our lock.335 """336 global _master_keys_lock337 assert _master_keys_lock is None, "Should only call init_ssh once"338 _master_keys_lock = _threading.Lock()339def _open_ssh(host, port=None):340 global _ssh_master341 # Acquire the lock. This is needed to prevent opening multiple masters for342 # the same host when we're running "repo sync -jN" (for N > 1) _and_ the343 # manifest <remote fetch="ssh://xyz"> specifies a different host from the344 # one that was passed to repo init.345 _master_keys_lock.acquire()346 try:347 # Check to see whether we already think that the master is running; if we348 # think it's already running, return right away.349 if port is not None:350 key = '%s:%s' % (host, port)351 else:352 key = host353 if key in _master_keys:354 return True355 if not _ssh_master \356 or 'GIT_SSH' in os.environ \357 or sys.platform in ('win32', 'cygwin'):358 # failed earlier, or cygwin ssh can't do this359 #360 return False361 # We will make two calls to ssh; this is the common part of both calls.362 command_base = ['ssh',363 '-o','ControlPath %s' % ssh_sock(),364 host]365 if port is not None:366 command_base[1:1] = ['-p', str(port)]367 # Since the key wasn't in _master_keys, we think that master isn't running.368 # ...but before actually starting a master, we'll double-check. This can369 # be important because we can't tell that that 'git@myhost.com' is the same370 # as 'myhost.com' where "User git" is setup in the user's ~/.ssh/config file.371 check_command = command_base + ['-O','check']372 try:373 Trace(': %s', ' '.join(check_command))374 check_process = subprocess.Popen(check_command,375 stdout=subprocess.PIPE,376 stderr=subprocess.PIPE)377 check_process.communicate() # read output, but ignore it...378 isnt_running = check_process.wait()379 if not isnt_running:380 # Our double-check found that the master _was_ infact running. Add to381 # the list of keys.382 _master_keys.add(key)383 return True384 except Exception:385 # Ignore excpetions. We we will fall back to the normal command and print386 # to the log there.387 pass388 command = command_base[:1] + \389 ['-M', '-N'] + \390 command_base[1:]391 try:392 Trace(': %s', ' '.join(command))393 p = subprocess.Popen(command)394 except Exception as e:395 _ssh_master = False396 print('\nwarn: cannot enable ssh control master for %s:%s\n%s'397 % (host,port, str(e)), file=sys.stderr)398 return False399 _master_processes.append(p)400 _master_keys.add(key)401 time.sleep(1)402 return True403 finally:404 _master_keys_lock.release()405def close_ssh():406 global _master_keys_lock407 terminate_ssh_clients()408 for p in _master_processes:409 try:410 os.kill(p.pid, SIGTERM)411 p.wait()412 except OSError:413 pass414 del _master_processes[:]415 _master_keys.clear()416 d = ssh_sock(create=False)417 if d:418 try:419 os.rmdir(os.path.dirname(d))420 except OSError:421 pass422 # We're done with the lock, so we can delete it.423 _master_keys_lock = None424URI_SCP = re.compile(r'^([^@:]*@?[^:/]{1,}):')425URI_ALL = re.compile(r'^([a-z][a-z+-]*)://([^@/]*@?[^/]*)/')426def GetSchemeFromUrl(url):427 m = URI_ALL.match(url)428 if m:429 return m.group(1)430 return None431@contextlib.contextmanager432def GetUrlCookieFile(url, quiet):433 if url.startswith('persistent-'):434 try:435 p = subprocess.Popen(436 ['git-remote-persistent-https', '-print_config', url],437 stdin=subprocess.PIPE, stdout=subprocess.PIPE,438 stderr=subprocess.PIPE)439 try:440 cookieprefix = 'http.cookiefile='441 proxyprefix = 'http.proxy='442 cookiefile = None443 proxy = None444 for line in p.stdout:445 line = line.strip()446 if line.startswith(cookieprefix):447 cookiefile = line[len(cookieprefix):]448 if line.startswith(proxyprefix):449 proxy = line[len(proxyprefix):]450 # Leave subprocess open, as cookie file may be transient.451 if cookiefile or proxy:452 yield cookiefile, proxy453 return454 finally:455 p.stdin.close()456 if p.wait():457 err_msg = p.stderr.read()458 if ' -print_config' in err_msg:459 pass # Persistent proxy doesn't support -print_config.460 elif not quiet:461 print(err_msg, file=sys.stderr)462 except OSError as e:463 if e.errno == errno.ENOENT:464 pass # No persistent proxy.465 raise466 yield GitConfig.ForUser().GetString('http.cookiefile'), None467def _preconnect(url):468 m = URI_ALL.match(url)469 if m:470 scheme = m.group(1)471 host = m.group(2)472 if ':' in host:473 host, port = host.split(':')474 else:475 port = None476 if scheme in ('ssh', 'git+ssh', 'ssh+git'):477 return _open_ssh(host, port)478 return False479 m = URI_SCP.match(url)480 if m:481 host = m.group(1)482 return _open_ssh(host)483 return False484class Remote(object):485 """Configuration options related to a remote.486 """487 def __init__(self, config, name):488 self._config = config489 self.name = name490 self.url = self._Get('url')491 self.review = self._Get('review')492 self.projectname = self._Get('projectname')493 self.fetch = list(map(RefSpec.FromString,494 self._Get('fetch', all_keys=True)))495 self._review_url = None496 def _InsteadOf(self):497 globCfg = GitConfig.ForUser()498 urlList = globCfg.GetSubSections('url')499 longest = ""500 longestUrl = ""501 for url in urlList:502 key = "url." + url + ".insteadOf"503 insteadOfList = globCfg.GetString(key, all_keys=True)504 for insteadOf in insteadOfList:505 if self.url.startswith(insteadOf) \506 and len(insteadOf) > len(longest):507 longest = insteadOf508 longestUrl = url509 if len(longest) == 0:510 return self.url511 return self.url.replace(longest, longestUrl, 1)512 def PreConnectFetch(self):513 connectionUrl = self._InsteadOf()514 return _preconnect(connectionUrl)515 def ReviewUrl(self, userEmail):516 if self._review_url is None:517 if self.review is None:518 return None519 u = self.review520 if u.startswith('persistent-'):521 u = u[len('persistent-'):]522 if u.split(':')[0] not in ('http', 'https', 'sso'):523 u = 'http://%s' % u524 if u.endswith('/Gerrit'):525 u = u[:len(u) - len('/Gerrit')]526 if u.endswith('/ssh_info'):527 u = u[:len(u) - len('/ssh_info')]528 if not u.endswith('/'):529 u += '/'530 http_url = u531 if u in REVIEW_CACHE:532 self._review_url = REVIEW_CACHE[u]533 elif 'REPO_HOST_PORT_INFO' in os.environ:534 host, port = os.environ['REPO_HOST_PORT_INFO'].split()535 self._review_url = self._SshReviewUrl(userEmail, host, port)536 REVIEW_CACHE[u] = self._review_url537 elif u.startswith('sso:'):538 self._review_url = u # Assume it's right539 REVIEW_CACHE[u] = self._review_url540 else:541 try:542 info_url = u + 'ssh_info'543 info = urllib.request.urlopen(info_url).read()544 if info == 'NOT_AVAILABLE' or '<' in info:545 # If `info` contains '<', we assume the server gave us some sort546 # of HTML response back, like maybe a login page.547 #548 # Assume HTTP if SSH is not enabled or ssh_info doesn't look right.549 self._review_url = http_url550 else:551 host, port = info.split()552 self._review_url = self._SshReviewUrl(userEmail, host, port)553 except urllib.error.HTTPError as e:554 raise UploadError('%s: %s' % (self.review, str(e)))555 except urllib.error.URLError as e:556 raise UploadError('%s: %s' % (self.review, str(e)))557 except HTTPException as e:558 raise UploadError('%s: %s' % (self.review, e.__class__.__name__))559 REVIEW_CACHE[u] = self._review_url560 return self._review_url + self.projectname561 def _SshReviewUrl(self, userEmail, host, port):562 username = self._config.GetString('review.%s.username' % self.review)563 if username is None:564 username = userEmail.split('@')[0]565 return 'ssh://%s@%s:%s/' % (username, host, port)566 def ToLocal(self, rev):567 """Convert a remote revision string to something we have locally.568 """569 if self.name == '.' or IsId(rev):570 return rev571 if not rev.startswith('refs/'):572 rev = R_HEADS + rev573 for spec in self.fetch:574 if spec.SourceMatches(rev):575 return spec.MapSource(rev)576 if not rev.startswith(R_HEADS):577 return rev578 raise GitError('remote %s does not have %s' % (self.name, rev))579 def WritesTo(self, ref):580 """True if the remote stores to the tracking ref.581 """582 for spec in self.fetch:583 if spec.DestMatches(ref):584 return True585 return False586 def ResetFetch(self, mirror=False):587 """Set the fetch refspec to its default value.588 """589 if mirror:590 dst = 'refs/heads/*'591 else:592 dst = 'refs/remotes/%s/*' % self.name593 self.fetch = [RefSpec(True, 'refs/heads/*', dst)]594 def Save(self):595 """Save this remote to the configuration.596 """597 self._Set('url', self.url)598 self._Set('review', self.review)599 self._Set('projectname', self.projectname)600 self._Set('fetch', list(map(str, self.fetch)))601 def _Set(self, key, value):602 key = 'remote.%s.%s' % (self.name, key)603 return self._config.SetString(key, value)604 def _Get(self, key, all_keys=False):605 key = 'remote.%s.%s' % (self.name, key)606 return self._config.GetString(key, all_keys = all_keys)607class Branch(object):608 """Configuration options related to a single branch.609 """610 def __init__(self, config, name):611 self._config = config612 self.name = name613 self.merge = self._Get('merge')614 r = self._Get('remote')615 if r:616 self.remote = self._config.GetRemote(r)617 else:618 self.remote = None619 @property620 def LocalMerge(self):621 """Convert the merge spec to a local name.622 """623 if self.remote and self.merge:624 return self.remote.ToLocal(self.merge)625 return None626 def Save(self):627 """Save this branch back into the configuration.628 """629 if self._config.HasSection('branch', self.name):630 if self.remote:631 self._Set('remote', self.remote.name)632 else:633 self._Set('remote', None)634 self._Set('merge', self.merge)635 else:636 fd = open(self._config.file, 'a')637 try:638 fd.write('[branch "%s"]\n' % self.name)639 if self.remote:640 fd.write('\tremote = %s\n' % self.remote.name)641 if self.merge:642 fd.write('\tmerge = %s\n' % self.merge)643 finally:644 fd.close()645 def _Set(self, key, value):646 key = 'branch.%s.%s' % (self.name, key)647 return self._config.SetString(key, value)648 def _Get(self, key, all_keys=False):649 key = 'branch.%s.%s' % (self.name, key)...

Full Screen

Full Screen

ifboard.py

Source:ifboard.py Github

copy

Full Screen

1import json2from logging import getLogger3import threading4import serial5from typing import Tuple, List6import numpy as np7def escape_nline_creturn(string):8 """Escape \n and \r in a string"""9 return string.replace('\n', '\\n').replace('\r', '\\r')10class SerialDevice:11 def __init__(self, port, baudrate=115200, timeout=0.1, name=None, terminator='\n', response_terminator=''):12 self.ser = None13 self.port = port14 self.baudrate = baudrate15 self.timeout = timeout16 self.name = name if name else self.port17 self.terminator = terminator18 self._response_terminator = response_terminator19 self._rlock = threading.RLock()20 def _preconnect(self):21 """22 Override to perform an action immediately prior to connection.23 Function should raise IOError if the serial device should not be opened.24 """25 pass26 def _postconnect(self):27 """28 Override to perform an action immediately after connection. Default is to sleep for twice the timeout29 Function should raise IOError if there are issues with the connection.30 Function will not be called if a connection can not be established or already exists.31 """32 pass33 def _predisconnect(self):34 """35 Override to perform an action immediately prior to disconnection.36 Function should raise IOError if the serial device should not be opened.37 """38 pass39 def connect(self, reconnect=False, raise_errors=True):40 """41 Connect to a serial port. If reconnect is True, closes the port first and then tries to reopen it. First asks42 the port if it is already open. If so, returns nothing and allows the calling function to continue on. If port43 is not already open, first attempts to create a serial.Serial object and establish the connection.44 Raises an IOError if the serial connection is unable to be established.45 """46 if reconnect:47 self.disconnect()48 try:49 if self.ser.isOpen():50 return51 except Exception:52 pass53 getLogger(__name__).debug(f"Connecting to {self.port} at {self.baudrate}")54 try:55 self._preconnect()56 self.ser = serial.Serial(port=self.port, baudrate=self.baudrate, timeout=self.timeout)57 self._postconnect()58 getLogger(__name__).debug(f"port {self.port} connection established")59 return True60 except (serial.SerialException, IOError) as e:61 getLogger(__name__).error(f"Connecting to port {self.port} failed: {e}, will attempt disconnect.")62 self.disconnect()63 if raise_errors:64 raise e65 return False66 def disconnect(self):67 """68 First closes the existing serial connection and then sets the ser attribute to None. If an exception occurs in69 closing the port, log the error but do not raise.70 """71 try:72 self._predisconnect()73 self.ser.close()74 self.ser = None75 except Exception as e:76 getLogger(__name__).info(f"Exception during disconnect: {e}")77 def format_msg(self, msg: str):78 """Subclass may implement to apply hardware specific formatting"""79 if msg and msg[-1] != self.terminator:80 msg = msg + self.terminator81 return msg.encode('utf-8')82 def send(self, msg: str, connect=True):83 """84 Send a message to a serial port. If connect is True, try to connect to the serial port before sending the85 message. Formats message according to the class's format_msg function before attempting to write to serial port.86 If IOError or SerialException occurs, first disconnect from the serial port, then log and raise the error.87 """88 with self._rlock:89 if connect:90 self.connect()91 try:92 msg = self.format_msg(msg)93 getLogger(__name__).debug(f"Sending '{msg}'")94 self.ser.write(msg)95 resp, success = self.receive()96 if not success:97 raise IOError(resp)98 return resp99 except (serial.SerialException, IOError) as e:100 self.disconnect()101 getLogger(__name__).error(f"...failed: {e}")102 raise e103 def receive(self):104 """105 Receive data until a timeout or a :/n or ?/n is recieved.106 received, decode it and strip it of the confirmation/rejection. In the case of a serialException,107 disconnects from the serial port and raises an IOError.108 returns a tuple any respponse (minus the confiramtion/rejection) and a boolean indicating109 confirmation/rejection110 """111 with self._rlock:112 try:113 lines = []114 more = True115 while more:116 lines.append(self.ser.readline().decode("utf-8"))117 if lines[-1].endswith('?\r\n') or lines[-1].endswith(':\r\n'):118 more = False119 response = ''.join(lines)120 getLogger(__name__).debug(f"Read {escape_nline_creturn(response)} from {self.name}")121 return response.strip()[:-1], response.endswith(':\r\n')122 except (IOError, serial.SerialException) as e:123 self.disconnect()124 getLogger(__name__).debug(f"Send failed {e}")125 raise IOError(e)126class IFBoard(SerialDevice):127 def __init__(self, port='/dev/ifboard', timeout=0.1, connect=True):128 super().__init__(port, 115200, timeout, response_terminator='\r\n')129 self.firmware = None130 self._monitor_thread = None131 self._initialized = False132 self.initialized_at_last_connect = False133 self.rebooted = None134 if connect:135 self.connect(raise_errors=False)136 def set_lo(self, freq_mhz, fractional: bool = True, full_calibration=True, g2_mode=False):137 try:138 freq_mhz = float(freq_mhz)139 assert 9600 > float(freq_mhz) > 0140 except (TypeError, AssertionError):141 raise ValueError('Frequency must be a float in (0, 9600)')142 self.send('G2' + ('T' if g2_mode else 'F'))143 self.send('FM' + ('T' if fractional else 'F'))144 self.send('CA' + ('T' if full_calibration else 'F'))145 resp = self.send(f'LO{freq_mhz}')146 if 'ERROR' in resp:147 getLogger(__name__).error(resp)148 raise ValueError(resp)149 def set_attens(self, output_attens: (float, Tuple[float], List[float], None) = None,150 input_attens: (float, Tuple[float], List[float], None) = None):151 """152 Set attenuator values. Values are set in the order of the signal path. None implies a value is not changed.153 A single number sets both attenuation values.154 When setting input_attens, the majority of the attenuation should be in the second attenuator.155 Raises IOError on com failure, ValueError on invalid value or setting failure.156 """157 current = [self.attens[v] for v in ('dac1', 'dac2', 'adc1', 'adc2')]158 output_attens = output_attens if isinstance(output_attens, (tuple, list)) else [output_attens] * 2159 if len(output_attens) != 2:160 raise ValueError('Incorrect number of output attenuations')161 input_attens = input_attens if isinstance(input_attens, (tuple, list)) else [input_attens] * 2162 if len(input_attens) != 2:163 raise ValueError('Incorrect number of input attenuations')164 new = output_attens + input_attens165 try:166 attens = [str(float(n if n is not None else c)) for n, c in zip(new, current)]167 except TypeError:168 raise ValueError('Attenuations must be float or None')169 self.send('AT' + ','.join(attens))170 @property171 def powered(self):172 powered = self.send('IO?')173 if powered == '1':174 return True175 elif powered == '0':176 return False177 else:178 raise IOError("Board did not adhere to protocol")179 @property180 def attens(self):181 """182 Returns a dict of attenuator values in dB: adc1, adc2, dac1, dac2183 These values are either in effect or will take effect when powered184 """185 atten = self.send('AT?')186 try:187 attens = json.loads(atten)188 except json.JSONDecodeError:189 raise IOError("IF board did not adhere to protocol")190 return attens191 def _postconnect(self):192 data = self.ser.readall().decode('utf-8')193 if 'Enter PC for cmd list' in data:194 self.rebooted = True195 self.ser.write('x\n'.encode('utf-8')) # a throwaway string just to stop any boot message196 self.ser.readall()197 @property198 def lo_freq(self):199 """Get the LO frequency in use while powered"""200 return float(self.send('LO?'))201 def save_settings(self):202 """Save current settings as power-on defaults"""203 self.send('SV')204 def power_off(self, save_settings=True):205 """Turn off IF board power, optionally saving the active settings"""206 self.send('IO0' + ('S' if save_settings else ''))207 def power_on(self):208 """Power up and load saved settings"""209 self.send('IO1')210 def raw(self, cmd):211 """Send a raw serial command to the IF board and return the response"""212 return self.send(cmd)213 def status(self):214 response = self.send('TS')215 getLogger(__name__).debug(response)216 lines = response.split('\n')217 lines = [l for l in lines if not l.startswith('#')]218 return IFStatus(''.join(lines))219class IFStatus:220 def __init__(self, jsonstr):221 self._data = d = json.loads(jsonstr)222 self.general = g = d['global']223 self.fresh_boot = not g['coms']224 self.boot = g['boot']225 self.power = g['power']226 self.trf_general = t = d['trf'][0]227 self.trf_regs = d['trf'][1:]228 self.dac_attens = (d['attens']['dac1'], d['attens']['dac2'])229 self.adc_attens = (d['attens']['adc1'], d['attens']['adc2'])230 partialcal = not (g['gen2'] or g['g3fcal'])231 s = 'LO gen{} {} mode, {} calibration. PLL {}locked.\n\tReq: {} MHz Attained: {} MHz Err: {} MHz'232 self.lo_mode = s.format('32'[g['gen2']], ('integer', 'fractional')[g['fract']], ('full', 'partial')[partialcal],233 ('un', '')[t['pll_locked']], g['lo'], t['f_LO'], (t['f_LO'] or np.nan) - g['lo'])234 def __str__(self):235 stat = 'IFStatus: {}, boot {}{}. {}\n\tDAC attens: {}\n\tADC Attens: {}'236 return stat.format('Powered' if self.power else 'Unpowered', self.boot,237 ', freshly booted' if self.fresh_boot else '',238 self.lo_mode, self.dac_attens, self.adc_attens)239# import logging240# logging.basicConfig()241# logging.getLogger('mkidgen3').setLevel('DEBUG')242# import mkidgen3.ifboard243# d='/dev/cu.usbmodem14401'...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run fMBT automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful