How to use __control_file method in autotest

Best Python code snippet using autotest_python

advertisement_scanning.py

Source:advertisement_scanning.py Github

copy

Full Screen

1#!/usr/bin/python32# -*- mode: python; coding: utf-8 -*-3"""Bluetooth Low Energy (BLE) beacon advertisement and scanning.4Execution of a BLE beacon for use in BWSI PiPact independent project. 5Configuration of beacon done via external YAML. Underlying functionality 6provided by PyBluez module (https://github.com/pybluez/pybluez). Beacon 7uses iBeacon format (https://en.wikipedia.org/wiki/IBeacon).8"""9import argparse10from bluetooth.ble import BeaconService11from datetime import datetime12from itertools import zip_longest13import logging14import logging.config15import pandas as pd16from pathlib import Path17import sys18import time19from uuid import uuid120import yaml21# Default configuration22LOG_NAME = 'pi_pact.log'23DEFAULT_CONFIG = {24 'advertiser': {25 'control_file': "advertiser_control",26 'timeout': None,27 'uuid': '',28 'major': 1,29 'minor': 1,30 'tx_power': 1,31 'interval': 20032 },33 'scanner': {34 'control_file': "scanner_control",35 'scan_prefix': "pi_pact_scan",36 'timeout': None,37 'revisit': 1,38 'filters': {}39 },40 'logger': {41 'name': LOG_NAME,42 'config': {43 'version': 1,44 'formatters': {45 'full': {46 'format': '%(asctime)s %(module)-10s %(levelname)-8s %(message)s'},47 'brief': {48 'format': '%(asctime)s %(levelname)-8s %(message)s'},49 },50 'handlers': {51 'console': {52 'class': 'logging.StreamHandler',53 'level': 'INFO',54 'formatter': 'brief'55 },56 'file': {57 'class': 'logging.handlers.TimedRotatingFileHandler',58 'level': 'DEBUG',59 'formatter': 'full',60 'filename': LOG_NAME,61 'when': 'H',62 'interval': 163 }64 },65 'loggers': {66 LOG_NAME: {67 'level': 'DEBUG',68 'handlers': ['console', 'file']69 }70 }71 }72 }73 }74# Universal settings75BLE_DEVICE = "hci0"76CONTROL_INTERVAL = 1 # (s)77MAX_TIMEOUT = 600 # (s)78ID_FILTERS = ['ADDRESS', 'UUID', 'MAJOR', 'MINOR', 'TX POWER']79MEASUREMENT_FILTERS = ['TIMESTAMP', 'RSSI']80# Limits81MAJOR_LIMITS = [1, 65535]82MINOR_LIMITS = [1, 65535]83TX_POWER_LIMITS = [-40, 4]84INTERVAL_LIMITS = [20, 10000] # (ms)85ALLOWABLE_FILTERS = ID_FILTERS+MEASUREMENT_FILTERS86class Advertiser(object):87 """Instantiates a BLE beacon advertiser.88 89 Attributes:90 control_file (pathlib.Path): BLE beacon advertiser control file path.91 timeout (float, int): BLE beacon advertiser timeout (s). Must be 92 strictly positive and less than 600.93 uuid (str): BLE beacon advertiser UUID. Must be 32 hexadecimal digits 94 split into 5 groups separated by hyphens. The number of digits in 95 each group from first to last) is {8, 4, 4, 4, 12}.96 major (int): BLE beacon advertiser major value. Must be in [1, 65535].97 minor (int): BLE beacon advertiser minor value. Must be in [1, 65535].98 tx_power (int): BLE beacon advertiser TX power value. Must be in 99 [-40, 4].100 interval (int): BLE beacon advertiser interval (ms) value. Must be in 101 [20, 10000].102 """103 def __init__(self, logger, **kwargs):104 """Instance initialization.105 Args:106 logger (logging.Logger): Configured logger.107 **kwargs: Keyword arguments corresponding to instance attributes. 108 Any unassociated keyword arguments are ignored.109 """110 # Logger111 self.__logger = logger112 # Beacon settings113 for key, value in DEFAULT_CONFIG['advertiser'].items():114 if key in kwargs and kwargs[key]:115 setattr(self, key, kwargs[key])116 else:117 self.__logger.debug("Using default beacon advertiser "118 f"configuration {key}: {value}.")119 setattr(self, key, value)120 # Create beacon121 self.__service = BeaconService(BLE_DEVICE)122 self.__logger.info("Initialized beacon advertiser.")123 124 def __del__(self):125 """Instance destruction."""126 if self.__control_file_handle is not None:127 self.__control_file_handle.close()128 self.__control_file.unlink()129 130 @property131 def control_file(self):132 """BLE beacon advertiser control file path getter."""133 return self.__control_file134 135 @control_file.setter136 def control_file(self, value):137 """BLE beacon advertiser control file path setter.138 139 Raises:140 TypeError: Beacon advertiser control file must be a string.141 """142 if not isinstance(value, str):143 raise TypeError("Beacon advertiser control file must be a string.")144 else:145 self.__control_file = Path(value).resolve()146 self.__control_file.touch()147 self.__control_file.chmod(0o777)148 with self.__control_file.open(mode='w') as f:149 f.write("0")150 self.__control_file_handle = None151 152 @property153 def timeout(self):154 """BLE beacon advertiser timeout getter."""155 return self.__timeout;156 157 @timeout.setter158 def timeout(self, value):159 """BLE beacon advertiser timeout setter.160 Raises:161 TypeError: Beacon advertiser timeout must be a float, integer, or 162 NoneType.163 ValueError: Beacon advertiser timeout must be strictly positive.164 ValueError: Beacon advertisertimeout cannot exceed maximum 165 allowable timeout.166 """167 if value is not None:168 if not isinstance(value, (float, int)):169 raise TypeError("Beacon advertiser timeout must be a float, "170 "integer, or NoneType.")171 elif value <= 0:172 raise ValueError("Beacon advertiser timeout must be strictly "173 "positive.")174 elif value > MAX_TIMEOUT:175 raise ValueError("Beacon advertiser timeout cannot exceed "176 "maximum allowable timeout.")177 self.__timeout = value178 179 @property180 def uuid(self):181 """BLE beacon advertiser UUID getter."""182 return self.__uuid;183 @uuid.setter184 def uuid(self, value):185 """BLE beacon advertiser UUID setter.186 Raises:187 TypeError: Beacon advertiser UUID must be a string.188 """189 if not isinstance(value, str):190 raise TypeError("Beacon advertiser UUID must be a string.")191 elif not value:192 self.__uuid = str(uuid1())193 self.__logger.debug(f"Beacon advertiser UUID set to {self.__uuid}")194 else:195 self.__uuid = value;196 @property197 def major(self):198 """BLE beacon advertiser major value getter."""199 return self.__major200 @major.setter201 def major(self, value):202 """BLE beacon advertiser major value setter.203 Raises:204 TypeError: Beacon advertiser major value must be an integer.205 ValueError: Beacon advertiser major value must be in [1, 65535].206 """207 if not isinstance(value, int):208 raise TypeError("Beacon advertiser major value must be an integer.")209 elif value < MAJOR_LIMITS[0] or value > MAJOR_LIMITS[1]:210 raise ValueError("Beacon advertiser major value must be in range "211 f"{MAJOR_LIMITS}.")212 self.__major = value213 214 @property215 def minor(self):216 """BLE beacon advertiser minor value getter."""217 return self.__minor218 @minor.setter219 def minor(self, value):220 """BLE beacon advertiser minor value setter.221 Raises:222 TypeError: Beacon advertiser minor value must be an integer.223 ValueError: Beacon advertiser minor value must be in [1, 65535].224 """225 if not isinstance(value, int):226 raise TypeError("Beacon advertiser minor value must be an integer.")227 elif value < MINOR_LIMITS[0] or value > MINOR_LIMITS[1]:228 raise ValueError("Beacon advertiser minor value must be in range "229 f"{MINOR_LIMITS}.")230 self.__minor = value231 @property232 def tx_power(self):233 """BLE beacon advertiser TX power value getter."""234 return self.__tx_power235 @tx_power.setter236 def tx_power(self, value):237 """BLE beacon Beacon advertiser TX power setter.238 Raises:239 TypeError: Beacon advertiser TX power must be an integer.240 ValueError: Beacon advertiser TX power must be in [-40, 4].241 """242 if not isinstance(value, int):243 raise TypeError("Beacon advertiser TX power must be an integer.")244 elif value < TX_POWER_LIMITS[0] or value > TX_POWER_LIMITS[1]:245 raise ValueError("Beacon advertiser TX power must be in range "246 f"{TX_POWER_LIMITS}.")247 self.__tx_power = value248 @property249 def interval(self):250 """BLE beacon advertiser interval getter."""251 return self.__interval252 @interval.setter253 def interval(self, value):254 """BLE beacon advertiser interval setter.255 Raises:256 TypeError: Beacon advertiser interval must be an integer.257 ValueError: Beacon advertiser interval must be in [20, 10000].258 """259 if not isinstance(value, int):260 raise TypeError("Beacon advertiser interval must be an integer.")261 elif value < INTERVAL_LIMITS[0] or value > INTERVAL_LIMITS[1]:262 raise ValueError("Beacon advertiser interval must be in range "263 f"{INTERVAL_LIMITS}.")264 self.__interval = value265 266 def advertise(self, timeout=0):267 """Execute BLE beacon advertisement.268 269 Args:270 timeout (int, float): Time (s) for which to advertise beacon. If271 specified as None then advertises till user commanded stop via272 control file. Defaults to configuration value.273 """274 # Parse inputs275 if timeout == 0:276 timeout = self.timeout277 # Update control file278 with self.__control_file.open(mode='w') as f:279 f.write("0")280 # Start advertising281 self.__logger.info("Starting beacon advertiser with timeout "282 f"{timeout}.")283 self.__service.start_advertising(self.uuid, self.major, self.minor,284 self.tx_power, self.interval)285 # Stop advertising based on either timeout or control file286 start_time = time.monotonic()287 self.__control_file_handle = self.__control_file.open(mode='r+')288 run = True289 while run:290 time.sleep(CONTROL_INTERVAL)291 if timeout is not None:292 if (time.monotonic()-start_time) > timeout:293 self.__logger.debug("Beacon advertiser timed out.")294 run = False295 self.__control_file_handle.seek(0)296 control_flag = self.__control_file_handle.read()297 if control_flag != "0":298 self.__logger.debug("Beacon advertiser control flag set to "299 "stop.")300 run = False301 self.__logger.info("Stopping beacon advertiser.") 302 self.__service.stop_advertising()303 # Cleanup304 self.__control_file_handle.close()305 with self.__control_file.open('w') as f:306 f.write("0")307 308class Scanner(object):309 """Instantiates a BLE beacon scanner.310 311 Attributes:312 control_file (pathlib.Path): BLE beacon scanner control file path.313 timeout (float, int): BLE beacon scanner timeout (s). Must be strictly314 positive and less than 600.315 revisit (int): BLE beacon scanner revisit interval (s). Must be 316 strictly positive.317 filters (dict): Filters to apply to received beacons. Available318 filters/keys are {'address', 'uuid', 'major', 'minor'}.319 """320 def __init__(self, logger, **kwargs):321 """Instance initialization.322 Args:323 logger (logging.Logger): Configured logger.324 **kwargs: Keyword arguments corresponding to instance attributes. 325 Any unassociated keyword arguments are ignored.326 """327 # Logger328 self.__logger = logger329 # Beacon settings330 for key, value in DEFAULT_CONFIG['scanner'].items():331 if key in kwargs and kwargs[key]:332 setattr(self, key, kwargs[key])333 else:334 self.__logger.debug("Using default beacon scanner "335 f"configuration {key}: {value}.")336 setattr(self, key, value)337 # Create beacon338 self.__service = BeaconService(BLE_DEVICE)339 self.__logger.info("Initialized beacon scanner.")340 341 def __del__(self):342 """Instance destruction."""343 if self.__control_file_handle is not None:344 self.__control_file_handle.close()345 self.__control_file.unlink()346 347 @property348 def control_file(self):349 """BLE beacon scanner control file path getter."""350 return self.__control_file351 352 @control_file.setter353 def control_file(self, value):354 """BLE beacon scanner control file path setter.355 356 Raises:357 TypeError: Beacon scanner control file must be a string.358 """359 if not isinstance(value, str):360 raise TypeError("Beacon scanner control file must be a string.")361 # Initialize control file362 self.__control_file = Path(value).resolve()363 self.__control_file.touch()364 self.__control_file.chmod(0o777)365 with self.__control_file.open(mode='w') as f:366 f.write("0")367 self.__control_file_handle = None368 @property369 def scan_prefix(self):370 """BLE beacon scanner scan file prefix getter."""371 return self.__scan_prefix372 373 @scan_prefix.setter374 def scan_prefix(self, value):375 """BLE beacon scanner scan file prefix setter.376 377 Raises:378 TypeError: Beacon scanner scan file prefix must be a string.379 """380 if not isinstance(value, str):381 raise TypeError("Beacon scanner scan file prefix must be a string.")382 self.__scan_prefix = value383 384 @property385 def timeout(self):386 """BLE beacon scanner timeout getter."""387 return self.__timeout;388 389 @timeout.setter390 def timeout(self, value):391 """BLE beacon scanner timeout setter.392 Raises:393 TypeError: Beacon scanner timeout must be a float, integer, or 394 NoneType.395 ValueError: Beacon scanner timeout must be strictly positive.396 ValueError: Beacon scanner cannot exceed maximum allowable timeout.397 """398 if value is not None:399 if not isinstance(value, (float, int)):400 raise TypeError("Beacon scanner timeout must be a float, "401 "integer, or NoneType.")402 elif value <= 0:403 raise ValueError("Beacon scanner timeout must be strictly "404 "positive.")405 elif value > MAX_TIMEOUT:406 raise ValueError("Beacon scanner timeout cannot exceed "407 "maximum allowable timeout.")408 self.__timeout = value409 410 @property411 def revisit(self):412 """BLE beacon scanner revisit interval getter."""413 return self.__revisit414 @revisit.setter415 def revisit(self, value):416 """BLE beacon scanner revisit interval setter.417 Raises:418 TypeError: Beacon scanner revisit interval must be an integer.419 ValueError: Beacon scanner revisit interval must be strictly 420 positive.421 """422 if not isinstance(value, int):423 raise TypeError("Beacon scanner revisit interval must be an "424 "integer.")425 elif value <= 0:426 raise ValueError("Beacon scanner revisit interval must strictly "427 "positive.")428 self.__revisit = value429 430 @property431 def filters(self):432 """BLE beacon scanner filters getter."""433 return self.__filters434 435 @filters.setter436 def filters(self, value):437 """BLE beacon scanner filters setter.438 Raises:439 TypeError: Beacon scanner filters must be a dictionary.440 KeyError: Beacon scanner filters must be one of allowable filters.441 """442 if not isinstance(value, dict):443 raise TypeError("Beacon scanner filters must be a dictionary.")444 elif not all([key in ALLOWABLE_FILTERS for key in value.keys()]):445 raise KeyError("Beacon scanner filters must be one of allowable "446 f"filters {ALLOWABLE_FILTERS}.")447 self.__filters = value448 449 def filter_advertisements(self, advertisements):450 """Filter received beacon advertisements based on filters.451 452 Args:453 advertisements (pandas.DataFrame): Parsed advertisements.454 455 Returns:456 Advertisements with all entries that were not compliant with the 457 filters removed.458 """459 for key, value in self.filters.items():460 # Filter based on fixed identifiers461 if key in ID_FILTERS:462 advertisements = advertisements[advertisements[key].isin([value])]463 # Filter based on measurements464 else:465 query_str = f"{value[0]} <= {key} and {key} <= {value[1]}"466 advertisements = advertisements.query(query_str)467 advertisements.reset_index(inplace=True, drop=True)468 return advertisements469 470 def process_scans(self, scans, timestamps):471 """Process collection of received beacon advertisement scans.472 473 Organize collection of received beacon advertisement scans according 474 to address, payload, and measurements.475 Args:476 scans (list): Received beacon advertisement scans. Each element 477 contains all advertisements received from one scan. Elements 478 are in temporal order.479 timestamps (list): Timestamps associated with each scan.480 481 Returns:482 Advertisements organized in a pandas.DataFrame by address first, 483 timestamp second, and then remainder of advertisement payload, 484 e.g., UUID, major, minor, etc.485 """486 # Collect all advertisements487 advertisements = []488 for (scan, timestamp) in zip_longest(scans, timestamps):489 for address, payload in scan.items():490 advertisement = {'ADDRESS': address, 'TIMESTAMP': timestamp}491 advertisement['UUID'] = payload[0]492 advertisement['MAJOR'] = payload[1]493 advertisement['MINOR'] = payload[2]494 advertisement['TX POWER'] = payload[3]495 advertisement['RSSI'] = payload[4]496 advertisements.append(advertisement)497 # Format into DataFrame498 return pd.DataFrame(advertisements,columns=['ADDRESS', 'TIMESTAMP', 499 'UUID', 'MAJOR', 'MINOR', 'TX POWER', 'RSSI'])500 def scan(self, scan_prefix='', timeout=0, revisit=1):501 """Execute BLE beacon scan.502 503 Args:504 scan_prefix (str): Scan output file prefix. Final output file name505 will be appended with first scan start timestamp. Defaults to506 configuration value.507 timeout (int, float): Time (s) for which to advertise beacon. If 508 specified as None then advertises till user commanded stop via 509 control file. Defaults to configuration value.510 revisit (int): Time interval (s) between consecutive scans. 511 Defaults to 1.512 513 Returns:514 Filtered advertisements organized in a pandas.DataFrame by address 515 first, timestamp second, and then remainder of advertisement 516 payload, e.g., UUID, major, minor, etc.517 """518 # Parse inputs519 if scan_prefix == '':520 scan_prefix = self.scan_prefix521 if timeout == 0:522 timeout = self.timeout523 # Update control file and scan output file524 with open(self.__control_file, 'w') as f:525 f.write("0")526 scan_file = Path(f"{scan_prefix}_{datetime.now():%Y%m%dT%H%M%S}.csv")527 # Start advertising528 self.__logger.info(f"Starting beacon scanner with timeout {timeout}.")529 self.__control_file_handle = self.__control_file.open(mode='r+')530 run = True 531 timestamps = []532 scans = []533 scan_count = 0534 start_time = time.monotonic()535 while run:536 scan_count += 1537 self.__logger.debug(f"Performing scan #{scan_count} at revisit "538 f"{self.revisit}.")539 timestamps.append(datetime.now())540 scans.append(self.__service.scan(self.revisit))541 # Stop advertising based on either timeout or control file542 if timeout is not None:543 if (time.monotonic()-start_time) > timeout:544 self.__logger.debug("Beacon scanner timed out.")545 run = False546 self.__control_file_handle.seek(0)547 control_flag = self.__control_file_handle.read()548 if control_flag != "0":549 self.__logger.debug("Beacon scanner control flag set to stop.")550 run = False551 self.__logger.info("Stopping beacon scanner.")552 # Cleanup553 self.__control_file_handle.close()554 with self.__control_file.open('w') as f:555 f.write("0")556 # Process, filter, and output received scans557 advertisements = self.process_scans(scans, timestamps)558 advertisements = self.filter_advertisements(advertisements)559 advertisements.to_csv(scan_file, index_label='SCAN')560 return advertisements561 562def setup_logger(config):563 """Setup and return logger based on configuration."""564 logging.config.dictConfig(config['config'])565 return logging.getLogger(config['name'])566 567def close_logger(logger):568 """Close logger."""569 for handler in logger.handlers[:]:570 handler.close()571 logger.removeHandler(handler)572 573def load_config(parsed_args):574 """Load configuration.575 Loads beacon/scanner configuration from parsed input argument. Any576 expected keys not specified use values from default configuration.577 Args:578 parsed_args (Namespace): Parsed input arguments.579 580 Returns:581 Configuration dictionary.582 """583 # Load default configuration if none specified584 if parsed_args['config_yml'] is None:585 config = DEFAULT_CONFIG586 # Load configuration YAML587 else:588 with open(parsed_args['config_yml'], 'r') as f:589 config = yaml.load(f, Loader=yaml.SafeLoader)590 config['advertiser'] = {**DEFAULT_CONFIG['advertiser'], 591 **config['advertiser']}592 config['scanner'] = {**DEFAULT_CONFIG['scanner'], 593 **config['scanner']}594 # Merge configuration values with command line options595 for key, value in parsed_args.items():596 if value is not None:597 if key in config['advertiser']:598 config['advertiser'][key] = value599 if key in config['scanner']:600 config['scanner'][key] = value601 # Remove malformed filters602 if config['scanner']['filters'] is not None:603 filters_to_remove = []604 for key, value in config['scanner']['filters'].items():605 if key not in ALLOWABLE_FILTERS:606 filters_to_remove.append(key)607 elif value is None:608 filters_to_remove.append(key)609 elif key in MEASUREMENT_FILTERS and len(value) != 2:610 filters_to_remove.append(key)611 for filter_to_remove in filters_to_remove:612 del config['scanner']['filters'][filter_to_remove]613 return config614 615def parse_args(args):616 """Input argument parser.617 Args:618 args (list): Input arguments as taken from sys.argv.619 620 Returns:621 Dictionary containing parsed input arguments. Keys are argument names.622 """623 # Parse command line arguments624 parser = argparse.ArgumentParser(625 description=("BLE beacon advertiser or scanner. Command line "626 "arguments will override their corresponding value in "627 "a configuration file if specified."))628 mode_group = parser.add_mutually_exclusive_group(required=True)629 mode_group.add_argument('-a', '--advertiser', action='store_true',630 help="Beacon advertiser mode.")631 mode_group.add_argument('-s', '--scanner', action='store_true',632 help="Beacon scanner mode.")633 parser.add_argument('--config_yml', help="Configuration YAML.")634 parser.add_argument('--control_file', help="Control file.")635 parser.add_argument('--scan_prefix', help="Scan output file prefix.")636 parser.add_argument('--timeout', type=float, 637 help="Timeout (s) for both beacon advertiser and scanner modes.")638 parser.add_argument('--uuid', help="Beacon advertiser UUID.")639 parser.add_argument('--major', type=int, 640 help="Beacon advertiser major value.")641 parser.add_argument('--minor', type=int, 642 help="Beacon advertiser minor value.")643 parser.add_argument('--tx_power', type=int, 644 help="Beacon advertiser TX power.")645 parser.add_argument('--interval', type=int,646 help="Beacon advertiser interval (ms).")647 parser.add_argument('--revisit', type=int, 648 help="Beacon scanner revisit interval (s)")649 return vars(parser.parse_args(args))650 651def main(args):652 """Creates beacon and either starts advertising or scanning.653 654 Args:655 args (list): Arguments as provided by sys.argv.656 Returns:657 If advertising then no output (None) is returned. If scanning 658 then scanned advertisements are returned in pandas.DataFrame.659 """660 # Initial setup661 parsed_args = parse_args(args)662 config = load_config(parsed_args)663 logger = setup_logger(config['logger'])664 logger.debug(f"Beacon configuration - {config['advertiser']}")665 logger.debug(f"Scanner configuration - {config['scanner']}")666 667 # Create and start beacon advertiser or scanner668 try:669 if parsed_args['advertiser']:670 logger.info("Beacon advertiser mode selected.")671 advertiser = Advertiser(logger, **config['advertiser'])672 advertiser.advertise()673 output = None674 elif parsed_args['scanner']:675 logger.info("Beacon scanner mode selected.")676 scanner = Scanner(logger, **config['scanner'])677 advertisements = scanner.scan()678 output = advertisements679 except Exception:680 logger.exception("Fatal exception encountered")681 finally:682 close_logger(logger)683 return output684 685if __name__ == "__main__":686 """Script execution."""687 main(sys.argv[1:])688 ...

Full Screen

Full Screen

pi_pact.py

Source:pi_pact.py Github

copy

Full Screen

1#!/usr/bin/python32# -*- mode: python; coding: utf-8 -*-3"""Bluetooth Low Energy (BLE) beacon advertisement and scanning.4Execution of a BLE beacon for use in BWSI PiPact independent project. 5Configuration of beacon done via external YAML. Underlying functionality 6provided by PyBluez module (https://github.com/pybluez/pybluez). Beacon 7uses iBeacon format (https://en.wikipedia.org/wiki/IBeacon).8"""9import argparse10from bluetooth.ble import BeaconService11from datetime import datetime12from itertools import zip_longest13import logging14import logging.config15import pandas as pd16from pathlib import Path17import sys18import time19from uuid import uuid120import yaml21# Default configuration22LOG_NAME = 'pi_pact.log'23DEFAULT_CONFIG = {24 'advertiser': {25 'control_file': "advertiser_control",26 'timeout': None,27 'uuid': '',28 'major': 1,29 'minor': 1,30 'tx_power': 1,31 'interval': 20032 },33 'scanner': {34 'control_file': "scanner_control",35 'scan_prefix': "pi_pact_scan",36 'timeout': None,37 'revisit': 1,38 'filters': {}39 },40 'logger': {41 'name': LOG_NAME,42 'config': {43 'version': 1,44 'formatters': {45 'full': {46 'format': '%(asctime)s %(module)-10s %(levelname)-8s %(message)s'},47 'brief': {48 'format': '%(asctime)s %(levelname)-8s %(message)s'},49 },50 'handlers': {51 'console': {52 'class': 'logging.StreamHandler',53 'level': 'INFO',54 'formatter': 'brief'55 },56 'file': {57 'class': 'logging.handlers.TimedRotatingFileHandler',58 'level': 'DEBUG',59 'formatter': 'full',60 'filename': LOG_NAME,61 'when': 'H',62 'interval': 163 }64 },65 'loggers': {66 LOG_NAME: {67 'level': 'DEBUG',68 'handlers': ['console', 'file']69 }70 }71 }72 }73 }74# Universal settings75BLE_DEVICE = "hci0"76CONTROL_INTERVAL = 1 # (s)77MAX_TIMEOUT = 600 # (s)78ID_FILTERS = ['ADDRESS', 'UUID', 'MAJOR', 'MINOR', 'TX POWER']79MEASUREMENT_FILTERS = ['TIMESTAMP', 'RSSI']80# Limits81MAJOR_LIMITS = [1, 65535]82MINOR_LIMITS = [1, 65535]83TX_POWER_LIMITS = [-40, 4]84INTERVAL_LIMITS = [20, 10000] # (ms)85ALLOWABLE_FILTERS = ID_FILTERS+MEASUREMENT_FILTERS86class Advertiser(object):87 """Instantiates a BLE beacon advertiser.88 89 Attributes:90 control_file (pathlib.Path): BLE beacon advertiser control file path.91 timeout (float, int): BLE beacon advertiser timeout (s). Must be 92 strictly positive and less than 600.93 uuid (str): BLE beacon advertiser UUID. Must be 32 hexadecimal digits 94 split into 5 groups separated by hyphens. The number of digits in 95 each group from first to last) is {8, 4, 4, 4, 12}.96 major (int): BLE beacon advertiser major value. Must be in [1, 65535].97 minor (int): BLE beacon advertiser minor value. Must be in [1, 65535].98 tx_power (int): BLE beacon advertiser TX power value. Must be in 99 [-40, 4].100 interval (int): BLE beacon advertiser interval (ms) value. Must be in 101 [20, 10000].102 """103 def __init__(self, logger, **kwargs):104 """Instance initialization.105 Args:106 logger (logging.Logger): Configured logger.107 **kwargs: Keyword arguments corresponding to instance attributes. 108 Any unassociated keyword arguments are ignored.109 """110 # Logger111 self.__logger = logger112 # Beacon settings113 for key, value in DEFAULT_CONFIG['advertiser'].items():114 if key in kwargs and kwargs[key]:115 setattr(self, key, kwargs[key])116 else:117 self.__logger.debug("Using default beacon advertiser "118 f"configuration {key}: {value}.")119 setattr(self, key, value)120 # Create beacon121 self.__service = BeaconService(BLE_DEVICE)122 self.__logger.info("Initialized beacon advertiser.")123 124 def __del__(self):125 """Instance destruction."""126 if self.__control_file_handle is not None:127 self.__control_file_handle.close()128 self.__control_file.unlink()129 130 @property131 def control_file(self):132 """BLE beacon advertiser control file path getter."""133 return self.__control_file134 135 @control_file.setter136 def control_file(self, value):137 """BLE beacon advertiser control file path setter.138 139 Raises:140 TypeError: Beacon advertiser control file must be a string.141 """142 if not isinstance(value, str):143 raise TypeError("Beacon advertiser control file must be a string.")144 else:145 self.__control_file = Path(value).resolve()146 self.__control_file.touch()147 self.__control_file.chmod(0o777)148 with self.__control_file.open(mode='w') as f:149 f.write("0")150 self.__control_file_handle = None151 152 @property153 def timeout(self):154 """BLE beacon advertiser timeout getter."""155 return self.__timeout;156 157 @timeout.setter158 def timeout(self, value):159 """BLE beacon advertiser timeout setter.160 Raises:161 TypeError: Beacon advertiser timeout must be a float, integer, or 162 NoneType.163 ValueError: Beacon advertiser timeout must be strictly positive.164 ValueError: Beacon advertisertimeout cannot exceed maximum 165 allowable timeout.166 """167 if value is not None:168 if not isinstance(value, (float, int)):169 raise TypeError("Beacon advertiser timeout must be a float, "170 "integer, or NoneType.")171 elif value <= 0:172 raise ValueError("Beacon advertiser timeout must be strictly "173 "positive.")174 elif value > MAX_TIMEOUT:175 raise ValueError("Beacon advertiser timeout cannot exceed "176 "maximum allowable timeout.")177 self.__timeout = value178 179 @property180 def uuid(self):181 """BLE beacon advertiser UUID getter."""182 return self.__uuid;183 @uuid.setter184 def uuid(self, value):185 """BLE beacon advertiser UUID setter.186 Raises:187 TypeError: Beacon advertiser UUID must be a string.188 """189 if not isinstance(value, str):190 raise TypeError("Beacon advertiser UUID must be a string.")191 elif not value:192 self.__uuid = str(uuid1())193 self.__logger.debug(f"Beacon advertiser UUID set to {self.__uuid}")194 else:195 self.__uuid = value;196 @property197 def major(self):198 """BLE beacon advertiser major value getter."""199 return self.__major200 @major.setter201 def major(self, value):202 """BLE beacon advertiser major value setter.203 Raises:204 TypeError: Beacon advertiser major value must be an integer.205 ValueError: Beacon advertiser major value must be in [1, 65535].206 """207 if not isinstance(value, int):208 raise TypeError("Beacon advertiser major value must be an integer.")209 elif value < MAJOR_LIMITS[0] or value > MAJOR_LIMITS[1]:210 raise ValueError("Beacon advertiser major value must be in range "211 f"{MAJOR_LIMITS}.")212 self.__major = value213 214 @property215 def minor(self):216 """BLE beacon advertiser minor value getter."""217 return self.__minor218 @minor.setter219 def minor(self, value):220 """BLE beacon advertiser minor value setter.221 Raises:222 TypeError: Beacon advertiser minor value must be an integer.223 ValueError: Beacon advertiser minor value must be in [1, 65535].224 """225 if not isinstance(value, int):226 raise TypeError("Beacon advertiser minor value must be an integer.")227 elif value < MINOR_LIMITS[0] or value > MINOR_LIMITS[1]:228 raise ValueError("Beacon advertiser minor value must be in range "229 f"{MINOR_LIMITS}.")230 self.__minor = value231 @property232 def tx_power(self):233 """BLE beacon advertiser TX power value getter."""234 return self.__tx_power235 @tx_power.setter236 def tx_power(self, value):237 """BLE beacon Beacon advertiser TX power setter.238 Raises:239 TypeError: Beacon advertiser TX power must be an integer.240 ValueError: Beacon advertiser TX power must be in [-40, 4].241 """242 if not isinstance(value, int):243 raise TypeError("Beacon advertiser TX power must be an integer.")244 elif value < TX_POWER_LIMITS[0] or value > TX_POWER_LIMITS[1]:245 raise ValueError("Beacon advertiser TX power must be in range "246 f"{TX_POWER_LIMITS}.")247 self.__tx_power = value248 @property249 def interval(self):250 """BLE beacon advertiser interval getter."""251 return self.__interval252 @interval.setter253 def interval(self, value):254 """BLE beacon advertiser interval setter.255 Raises:256 TypeError: Beacon advertiser interval must be an integer.257 ValueError: Beacon advertiser interval must be in [20, 10000].258 """259 if not isinstance(value, int):260 raise TypeError("Beacon advertiser interval must be an integer.")261 elif value < INTERVAL_LIMITS[0] or value > INTERVAL_LIMITS[1]:262 raise ValueError("Beacon advertiser interval must be in range "263 f"{INTERVAL_LIMITS}.")264 self.__interval = value265 266 def advertise(self, timeout=0):267 """Execute BLE beacon advertisement.268 269 Args:270 timeout (int, float): Time (s) for which to advertise beacon. If271 specified as None then advertises till user commanded stop via272 control file. Defaults to configuration value.273 """274 # Parse inputs275 if timeout == 0:276 timeout = self.timeout277 # Update control file278 with self.__control_file.open(mode='w') as f:279 f.write("0")280 # Start advertising281 self.__logger.info("Starting beacon advertiser with timeout "282 f"{timeout}.")283 self.__service.start_advertising(self.uuid, self.major, self.minor,284 self.tx_power, self.interval)285 # Stop advertising based on either timeout or control file286 start_time = time.monotonic()287 self.__control_file_handle = self.__control_file.open(mode='r+')288 run = True289 while run:290 time.sleep(CONTROL_INTERVAL)291 if timeout is not None:292 if (time.monotonic()-start_time) > timeout:293 self.__logger.debug("Beacon advertiser timed out.")294 run = False295 self.__control_file_handle.seek(0)296 control_flag = self.__control_file_handle.read()297 if control_flag != "0":298 self.__logger.debug("Beacon advertiser control flag set to "299 "stop.")300 run = False301 self.__logger.info("Stopping beacon advertiser.") 302 self.__service.stop_advertising()303 # Cleanup304 self.__control_file_handle.close()305 with self.__control_file.open('w') as f:306 f.write("0")307 308class Scanner(object):309 """Instantiates a BLE beacon scanner.310 311 Attributes:312 control_file (pathlib.Path): BLE beacon scanner control file path.313 timeout (float, int): BLE beacon scanner timeout (s). Must be strictly314 positive and less than 600.315 revisit (int): BLE beacon scanner revisit interval (s). Must be 316 strictly positive.317 filters (dict): Filters to apply to received beacons. Available318 filters/keys are {'address', 'uuid', 'major', 'minor'}.319 """320 def __init__(self, logger, **kwargs):321 """Instance initialization.322 Args:323 logger (logging.Logger): Configured logger.324 **kwargs: Keyword arguments corresponding to instance attributes. 325 Any unassociated keyword arguments are ignored.326 """327 # Logger328 self.__logger = logger329 # Beacon settings330 for key, value in DEFAULT_CONFIG['scanner'].items():331 if key in kwargs and kwargs[key]:332 setattr(self, key, kwargs[key])333 else:334 self.__logger.debug("Using default beacon scanner "335 f"configuration {key}: {value}.")336 setattr(self, key, value)337 # Create beacon338 self.__service = BeaconService(BLE_DEVICE)339 self.__logger.info("Initialized beacon scanner.")340 341 def __del__(self):342 """Instance destruction."""343 if self.__control_file_handle is not None:344 self.__control_file_handle.close()345 self.__control_file.unlink()346 347 @property348 def control_file(self):349 """BLE beacon scanner control file path getter."""350 return self.__control_file351 352 @control_file.setter353 def control_file(self, value):354 """BLE beacon scanner control file path setter.355 356 Raises:357 TypeError: Beacon scanner control file must be a string.358 """359 if not isinstance(value, str):360 raise TypeError("Beacon scanner control file must be a string.")361 # Initialize control file362 self.__control_file = Path(value).resolve()363 self.__control_file.touch()364 self.__control_file.chmod(0o777)365 with self.__control_file.open(mode='w') as f:366 f.write("0")367 self.__control_file_handle = None368 @property369 def scan_prefix(self):370 """BLE beacon scanner scan file prefix getter."""371 return self.__scan_prefix372 373 @scan_prefix.setter374 def scan_prefix(self, value):375 """BLE beacon scanner scan file prefix setter.376 377 Raises:378 TypeError: Beacon scanner scan file prefix must be a string.379 """380 if not isinstance(value, str):381 raise TypeError("Beacon scanner scan file prefix must be a string.")382 self.__scan_prefix = value383 384 @property385 def timeout(self):386 """BLE beacon scanner timeout getter."""387 return self.__timeout;388 389 @timeout.setter390 def timeout(self, value):391 """BLE beacon scanner timeout setter.392 Raises:393 TypeError: Beacon scanner timeout must be a float, integer, or 394 NoneType.395 ValueError: Beacon scanner timeout must be strictly positive.396 ValueError: Beacon scanner cannot exceed maximum allowable timeout.397 """398 if value is not None:399 if not isinstance(value, (float, int)):400 raise TypeError("Beacon scanner timeout must be a float, "401 "integer, or NoneType.")402 elif value <= 0:403 raise ValueError("Beacon scanner timeout must be strictly "404 "positive.")405 elif value > MAX_TIMEOUT:406 raise ValueError("Beacon scanner timeout cannot exceed "407 "maximum allowable timeout.")408 self.__timeout = value409 410 @property411 def revisit(self):412 """BLE beacon scanner revisit interval getter."""413 return self.__revisit414 @revisit.setter415 def revisit(self, value):416 """BLE beacon scanner revisit interval setter.417 Raises:418 TypeError: Beacon scanner revisit interval must be an integer.419 ValueError: Beacon scanner revisit interval must be strictly 420 positive.421 """422 if not isinstance(value, int):423 raise TypeError("Beacon scanner revisit interval must be an "424 "integer.")425 elif value <= 0:426 raise ValueError("Beacon scanner revisit interval must strictly "427 "positive.")428 self.__revisit = value429 430 @property431 def filters(self):432 """BLE beacon scanner filters getter."""433 return self.__filters434 435 @filters.setter436 def filters(self, value):437 """BLE beacon scanner filters setter.438 Raises:439 TypeError: Beacon scanner filters must be a dictionary.440 KeyError: Beacon scanner filters must be one of allowable filters.441 """442 if not isinstance(value, dict):443 raise TypeError("Beacon scanner filters must be a dictionary.")444 elif not all([key in ALLOWABLE_FILTERS for key in value.keys()]):445 raise KeyError("Beacon scanner filters must be one of allowable "446 f"filters {ALLOWABLE_FILTERS}.")447 self.__filters = value448 449 def filter_advertisements(self, advertisements):450 """Filter received beacon advertisements based on filters.451 452 Args:453 advertisements (pandas.DataFrame): Parsed advertisements.454 455 Returns:456 Advertisements with all entries that were not compliant with the 457 filters removed.458 """459 for key, value in self.filters.items():460 # Filter based on fixed identifiers461 if key in ID_FILTERS:462 advertisements = advertisements[advertisements[key].isin([value])]463 # Filter based on measurements464 else:465 query_str = f"{value[0]} <= {key} and {key} <= {value[1]}"466 advertisements = advertisements.query(query_str)467 advertisements.reset_index(inplace=True, drop=True)468 return advertisements469 470 def process_scans(self, scans, timestamps):471 """Process collection of received beacon advertisement scans.472 473 Organize collection of received beacon advertisement scans according 474 to address, payload, and measurements.475 Args:476 scans (list): Received beacon advertisement scans. Each element 477 contains all advertisements received from one scan. Elements 478 are in temporal order.479 timestamps (list): Timestamps associated with each scan.480 481 Returns:482 Advertisements organized in a pandas.DataFrame by address first, 483 timestamp second, and then remainder of advertisement payload, 484 e.g., UUID, major, minor, etc.485 """486 # Collect all advertisements487 advertisements = []488 for (scan, timestamp) in zip_longest(scans, timestamps):489 for address, payload in scan.items():490 advertisement = {'ADDRESS': address, 'TIMESTAMP': timestamp}491 advertisement['UUID'] = payload[0]492 advertisement['MAJOR'] = payload[1]493 advertisement['MINOR'] = payload[2]494 advertisement['TX POWER'] = payload[3]495 advertisement['RSSI'] = payload[4]496 advertisements.append(advertisement)497 # Format into DataFrame498 return pd.DataFrame(advertisements,columns=['ADDRESS', 'TIMESTAMP', 499 'UUID', 'MAJOR', 'MINOR', 'TX POWER', 'RSSI'])500 def scan(self, scan_prefix='', timeout=0, revisit=1):501 """Execute BLE beacon scan.502 503 Args:504 scan_prefix (str): Scan output file prefix. Final output file name505 will be appended with first scan start timestamp. Defaults to506 configuration value.507 timeout (int, float): Time (s) for which to advertise beacon. If 508 specified as None then advertises till user commanded stop via 509 control file. Defaults to configuration value.510 revisit (int): Time interval (s) between consecutive scans. 511 Defaults to 1.512 513 Returns:514 Filtered advertisements organized in a pandas.DataFrame by address 515 first, timestamp second, and then remainder of advertisement 516 payload, e.g., UUID, major, minor, etc.517 """518 # Parse inputs519 if scan_prefix == '':520 scan_prefix = self.scan_prefix521 if timeout == 0:522 timeout = self.timeout523 # Update control file and scan output file524 with open(self.__control_file, 'w') as f:525 f.write("0")526 scan_file = Path(f"{scan_prefix}_{datetime.now():%Y%m%dT%H%M%S}.csv")527 # Start advertising528 self.__logger.info(f"Starting beacon scanner with timeout {timeout}.")529 self.__control_file_handle = self.__control_file.open(mode='r+')530 run = True 531 timestamps = []532 scans = []533 scan_count = 0534 start_time = time.monotonic()535 while run:536 scan_count += 1537 self.__logger.debug(f"Performing scan #{scan_count} at revisit "538 f"{self.revisit}.")539 timestamps.append(datetime.now())540 scans.append(self.__service.scan(self.revisit))541 # Stop advertising based on either timeout or control file542 if timeout is not None:543 if (time.monotonic()-start_time) > timeout:544 self.__logger.debug("Beacon scanner timed out.")545 run = False546 self.__control_file_handle.seek(0)547 control_flag = self.__control_file_handle.read()548 if control_flag != "0":549 self.__logger.debug("Beacon scanner control flag set to stop.")550 run = False551 self.__logger.info("Stopping beacon scanner.")552 # Cleanup553 self.__control_file_handle.close()554 with self.__control_file.open('w') as f:555 f.write("0")556 # Process, filter, and output received scans557 advertisements = self.process_scans(scans, timestamps)558 advertisements = self.filter_advertisements(advertisements)559 advertisements.to_csv(scan_file, index_label='SCAN')560 return advertisements561 562def setup_logger(config):563 """Setup and return logger based on configuration."""564 logging.config.dictConfig(config['config'])565 return logging.getLogger(config['name'])566 567def close_logger(logger):568 """Close logger."""569 for handler in logger.handlers[:]:570 handler.close()571 logger.removeHandler(handler)572 573def load_config(parsed_args):574 """Load configuration.575 Loads beacon/scanner configuration from parsed input argument. Any576 expected keys not specified use values from default configuration.577 Args:578 parsed_args (Namespace): Parsed input arguments.579 580 Returns:581 Configuration dictionary.582 """583 # Load default configuration if none specified584 if parsed_args['config_yml'] is None:585 config = DEFAULT_CONFIG586 # Load configuration YAML587 else:588 with open(parsed_args['config_yml'], 'r') as f:589 config = yaml.load(f, Loader=yaml.SafeLoader)590 config['advertiser'] = {**DEFAULT_CONFIG['advertiser'], 591 **config['advertiser']}592 config['scanner'] = {**DEFAULT_CONFIG['scanner'], 593 **config['scanner']}594 # Merge configuration values with command line options595 for key, value in parsed_args.items():596 if value is not None:597 if key in config['advertiser']:598 config['advertiser'][key] = value599 if key in config['scanner']:600 config['scanner'][key] = value601 # Remove malformed filters602 if config['scanner']['filters'] is not None:603 filters_to_remove = []604 for key, value in config['scanner']['filters'].items():605 if key not in ALLOWABLE_FILTERS:606 filters_to_remove.append(key)607 elif value is None:608 filters_to_remove.append(key)609 elif key in MEASUREMENT_FILTERS and len(value) != 2:610 filters_to_remove.append(key)611 for filter_to_remove in filters_to_remove:612 del config['scanner']['filters'][filter_to_remove]613 return config614 615def parse_args(args):616 """Input argument parser.617 Args:618 args (list): Input arguments as taken from sys.argv.619 620 Returns:621 Dictionary containing parsed input arguments. Keys are argument names.622 """623 # Parse command line arguments624 parser = argparse.ArgumentParser(625 description=("BLE beacon advertiser or scanner. Command line "626 "arguments will override their corresponding value in "627 "a configuration file if specified."))628 mode_group = parser.add_mutually_exclusive_group(required=True)629 mode_group.add_argument('-a', '--advertiser', action='store_true',630 help="Beacon advertiser mode.")631 mode_group.add_argument('-s', '--scanner', action='store_true',632 help="Beacon scanner mode.")633 parser.add_argument('--config_yml', help="Configuration YAML.")634 parser.add_argument('--control_file', help="Control file.")635 parser.add_argument('--scan_prefix', help="Scan output file prefix.")636 parser.add_argument('--timeout', type=float, 637 help="Timeout (s) for both beacon advertiser and scanner modes.")638 parser.add_argument('--uuid', help="Beacon advertiser UUID.")639 parser.add_argument('--major', type=int, 640 help="Beacon advertiser major value.")641 parser.add_argument('--minor', type=int, 642 help="Beacon advertiser minor value.")643 parser.add_argument('--tx_power', type=int, 644 help="Beacon advertiser TX power.")645 parser.add_argument('--interval', type=int,646 help="Beacon advertiser interval (ms).")647 parser.add_argument('--revisit', type=int, 648 help="Beacon scanner revisit interval (s)")649 return vars(parser.parse_args(args))650 651def main(args):652 """Creates beacon and either starts advertising or scanning.653 654 Args:655 args (list): Arguments as provided by sys.argv.656 Returns:657 If advertising then no output (None) is returned. If scanning 658 then scanned advertisements are returned in pandas.DataFrame.659 """660 # Initial setup661 parsed_args = parse_args(args)662 config = load_config(parsed_args)663 logger = setup_logger(config['logger'])664 logger.debug(f"Beacon configuration - {config['advertiser']}")665 logger.debug(f"Scanner configuration - {config['scanner']}")666 667 # Create and start beacon advertiser or scanner668 try:669 if parsed_args['advertiser']:670 logger.info("Beacon advertiser mode selected.")671 advertiser = Advertiser(logger, **config['advertiser'])672 advertiser.advertise()673 output = None674 elif parsed_args['scanner']:675 logger.info("Beacon scanner mode selected.")676 scanner = Scanner(logger, **config['scanner'])677 advertisements = scanner.scan()678 output = advertisements679 except Exception:680 logger.exception("Fatal exception encountered")681 finally:682 close_logger(logger)683 return output684 685if __name__ == "__main__":686 """Script execution."""687 main(sys.argv[1:])...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful