Best Python code snippet using autotest_python
advertisement_scanning.py
Source:advertisement_scanning.py  
1#!/usr/bin/python32# -*- mode: python; coding: utf-8 -*-3"""Bluetooth Low Energy (BLE) beacon advertisement and scanning.4Execution of a BLE beacon for use in BWSI PiPact independent project. 5Configuration of beacon done via external YAML. Underlying functionality 6provided by PyBluez module (https://github.com/pybluez/pybluez). Beacon 7uses iBeacon format (https://en.wikipedia.org/wiki/IBeacon).8"""9import argparse10from bluetooth.ble import BeaconService11from datetime import datetime12from itertools import zip_longest13import logging14import logging.config15import pandas as pd16from pathlib import Path17import sys18import time19from uuid import uuid120import yaml21# Default configuration22LOG_NAME = 'pi_pact.log'23DEFAULT_CONFIG = {24    'advertiser': {25        'control_file': "advertiser_control",26        'timeout': None,27        'uuid': '',28        'major': 1,29        'minor': 1,30        'tx_power': 1,31        'interval': 20032        },33    'scanner': {34        'control_file': "scanner_control",35        'scan_prefix': "pi_pact_scan",36        'timeout': None,37        'revisit': 1,38        'filters': {}39        },40    'logger': {41        'name': LOG_NAME,42        'config': {43            'version': 1,44            'formatters': {45                'full': {46                    'format': '%(asctime)s   %(module)-10s   %(levelname)-8s   %(message)s'},47                'brief': {48                    'format': '%(asctime)s   %(levelname)-8s   %(message)s'},49                },50            'handlers': {51                'console': {52                    'class': 'logging.StreamHandler',53                    'level': 'INFO',54                    'formatter': 'brief'55                    },56                'file': {57                    'class': 'logging.handlers.TimedRotatingFileHandler',58                    'level': 'DEBUG',59                    'formatter': 'full',60                    'filename': LOG_NAME,61                    'when': 'H',62                    'interval': 163                    }64                },65            'loggers': {66                LOG_NAME: {67                    'level': 'DEBUG',68                    'handlers': ['console', 'file']69                    }70                }71            }72        }73    }74# Universal settings75BLE_DEVICE = "hci0"76CONTROL_INTERVAL = 1 # (s)77MAX_TIMEOUT = 600 # (s)78ID_FILTERS = ['ADDRESS', 'UUID', 'MAJOR', 'MINOR', 'TX POWER']79MEASUREMENT_FILTERS = ['TIMESTAMP', 'RSSI']80# Limits81MAJOR_LIMITS = [1, 65535]82MINOR_LIMITS = [1, 65535]83TX_POWER_LIMITS = [-40, 4]84INTERVAL_LIMITS = [20, 10000] # (ms)85ALLOWABLE_FILTERS = ID_FILTERS+MEASUREMENT_FILTERS86class Advertiser(object):87    """Instantiates a BLE beacon advertiser.88    89    Attributes:90        control_file (pathlib.Path): BLE beacon advertiser control file path.91        timeout (float, int): BLE beacon advertiser timeout (s). Must be 92            strictly positive and less than 600.93        uuid (str): BLE beacon advertiser UUID. Must be 32 hexadecimal digits 94            split into 5 groups separated by hyphens. The number of digits in 95            each group from first to last) is {8, 4, 4, 4, 12}.96        major (int): BLE beacon advertiser major value. Must be in [1, 65535].97        minor (int): BLE beacon advertiser minor value. Must be in [1, 65535].98        tx_power (int): BLE beacon advertiser TX power value. Must be in 99            [-40, 4].100        interval (int): BLE beacon advertiser interval (ms) value. Must be in 101            [20, 10000].102    """103    def __init__(self, logger, **kwargs):104        """Instance initialization.105        Args:106            logger (logging.Logger): Configured logger.107            **kwargs: Keyword arguments corresponding to instance attributes. 108                Any unassociated keyword arguments are ignored.109        """110        # Logger111        self.__logger = logger112        # Beacon settings113        for key, value in DEFAULT_CONFIG['advertiser'].items():114            if key in kwargs and kwargs[key]:115                setattr(self, key, kwargs[key])116            else:117                self.__logger.debug("Using default beacon advertiser "118                        f"configuration {key}: {value}.")119                setattr(self, key, value)120        # Create beacon121        self.__service = BeaconService(BLE_DEVICE)122        self.__logger.info("Initialized beacon advertiser.")123        124    def __del__(self):125        """Instance destruction."""126        if self.__control_file_handle is not None:127            self.__control_file_handle.close()128        self.__control_file.unlink()129        130    @property131    def control_file(self):132        """BLE beacon advertiser control file path getter."""133        return self.__control_file134    135    @control_file.setter136    def control_file(self, value):137        """BLE beacon advertiser control file path setter.138        139        Raises:140            TypeError: Beacon advertiser control file must be a string.141        """142        if not isinstance(value, str):143            raise TypeError("Beacon advertiser control file must be a string.")144        else:145            self.__control_file = Path(value).resolve()146            self.__control_file.touch()147            self.__control_file.chmod(0o777)148            with self.__control_file.open(mode='w') as f:149                f.write("0")150            self.__control_file_handle = None151                152    @property153    def timeout(self):154        """BLE beacon advertiser timeout getter."""155        return self.__timeout;156    157    @timeout.setter158    def timeout(self, value):159        """BLE beacon advertiser timeout setter.160        Raises:161            TypeError: Beacon advertiser timeout must be a float, integer, or 162                NoneType.163            ValueError: Beacon advertiser timeout must be strictly positive.164            ValueError: Beacon advertisertimeout cannot exceed maximum 165                allowable timeout.166        """167        if value is not None:168            if not isinstance(value, (float, int)):169                raise TypeError("Beacon advertiser timeout must be a float, "170                        "integer, or NoneType.")171            elif value <= 0:172                raise ValueError("Beacon advertiser timeout must be strictly "173                        "positive.")174            elif value > MAX_TIMEOUT:175                raise ValueError("Beacon advertiser timeout cannot exceed "176                        "maximum allowable timeout.")177        self.__timeout = value178    179    @property180    def uuid(self):181        """BLE beacon advertiser UUID getter."""182        return self.__uuid;183    @uuid.setter184    def uuid(self, value):185        """BLE beacon advertiser UUID setter.186        Raises:187            TypeError: Beacon advertiser UUID must be a string.188        """189        if not isinstance(value, str):190            raise TypeError("Beacon advertiser UUID must be a string.")191        elif not value:192            self.__uuid = str(uuid1())193            self.__logger.debug(f"Beacon advertiser UUID set to {self.__uuid}")194        else:195            self.__uuid = value;196    @property197    def major(self):198        """BLE beacon advertiser major value getter."""199        return self.__major200    @major.setter201    def major(self, value):202        """BLE beacon advertiser major value setter.203        Raises:204            TypeError: Beacon advertiser major value must be an integer.205            ValueError: Beacon advertiser major value must be in [1, 65535].206         """207        if not isinstance(value, int):208            raise TypeError("Beacon advertiser major value must be an integer.")209        elif value < MAJOR_LIMITS[0] or value > MAJOR_LIMITS[1]:210            raise ValueError("Beacon advertiser major value must be in range "211                    f"{MAJOR_LIMITS}.")212        self.__major = value213            214    @property215    def minor(self):216        """BLE beacon advertiser minor value getter."""217        return self.__minor218    @minor.setter219    def minor(self, value):220        """BLE beacon advertiser minor value setter.221        Raises:222            TypeError: Beacon advertiser minor value must be an integer.223            ValueError: Beacon advertiser minor value must be in [1, 65535].224         """225        if not isinstance(value, int):226            raise TypeError("Beacon advertiser minor value must be an integer.")227        elif value < MINOR_LIMITS[0] or value > MINOR_LIMITS[1]:228            raise ValueError("Beacon advertiser minor value must be in range "229                    f"{MINOR_LIMITS}.")230        self.__minor = value231    @property232    def tx_power(self):233        """BLE beacon advertiser TX power value getter."""234        return self.__tx_power235    @tx_power.setter236    def tx_power(self, value):237        """BLE beacon Beacon advertiser TX power setter.238        Raises:239            TypeError: Beacon advertiser TX power must be an integer.240            ValueError: Beacon advertiser TX power must be in [-40, 4].241         """242        if not isinstance(value, int):243            raise TypeError("Beacon advertiser TX power must be an integer.")244        elif value < TX_POWER_LIMITS[0] or value > TX_POWER_LIMITS[1]:245            raise ValueError("Beacon advertiser TX power must be in range "246                    f"{TX_POWER_LIMITS}.")247        self.__tx_power = value248    @property249    def interval(self):250        """BLE beacon advertiser interval getter."""251        return self.__interval252    @interval.setter253    def interval(self, value):254        """BLE beacon advertiser interval setter.255        Raises:256            TypeError: Beacon advertiser interval must be an integer.257            ValueError: Beacon advertiser interval must be in [20, 10000].258         """259        if not isinstance(value, int):260            raise TypeError("Beacon advertiser interval must be an integer.")261        elif value < INTERVAL_LIMITS[0] or value > INTERVAL_LIMITS[1]:262            raise ValueError("Beacon advertiser interval must be in range "263                    f"{INTERVAL_LIMITS}.")264        self.__interval = value265            266    def advertise(self, timeout=0):267        """Execute BLE beacon advertisement.268        269        Args:270            timeout (int, float): Time (s) for which to advertise beacon. If271                specified as None then advertises till user commanded stop via272                control file. Defaults to configuration value.273        """274        # Parse inputs275        if timeout == 0:276            timeout = self.timeout277        # Update control file278        with self.__control_file.open(mode='w') as f:279            f.write("0")280        # Start advertising281        self.__logger.info("Starting beacon advertiser with timeout "282                f"{timeout}.")283        self.__service.start_advertising(self.uuid, self.major, self.minor,284                                         self.tx_power, self.interval)285        # Stop advertising based on either timeout or control file286        start_time = time.monotonic()287        self.__control_file_handle = self.__control_file.open(mode='r+')288        run = True289        while run:290            time.sleep(CONTROL_INTERVAL)291            if timeout is not None:292                if (time.monotonic()-start_time) > timeout:293                    self.__logger.debug("Beacon advertiser timed out.")294                    run = False295            self.__control_file_handle.seek(0)296            control_flag = self.__control_file_handle.read()297            if control_flag != "0":298                self.__logger.debug("Beacon advertiser control flag set to "299                        "stop.")300                run = False301        self.__logger.info("Stopping beacon advertiser.")        302        self.__service.stop_advertising()303        # Cleanup304        self.__control_file_handle.close()305        with self.__control_file.open('w') as f:306            f.write("0")307            308class Scanner(object):309    """Instantiates a BLE beacon scanner.310    311    Attributes:312        control_file (pathlib.Path): BLE beacon scanner control file path.313        timeout (float, int): BLE beacon scanner timeout (s). Must be strictly314            positive and less than 600.315        revisit (int): BLE beacon scanner revisit interval (s). Must be 316            strictly positive.317        filters (dict): Filters to apply to received beacons. Available318            filters/keys are {'address', 'uuid', 'major', 'minor'}.319    """320    def __init__(self, logger, **kwargs):321        """Instance initialization.322        Args:323            logger (logging.Logger): Configured logger.324            **kwargs: Keyword arguments corresponding to instance attributes. 325                Any unassociated keyword arguments are ignored.326        """327        # Logger328        self.__logger = logger329        # Beacon settings330        for key, value in DEFAULT_CONFIG['scanner'].items():331            if key in kwargs and kwargs[key]:332                setattr(self, key, kwargs[key])333            else:334                self.__logger.debug("Using default beacon scanner "335                        f"configuration {key}: {value}.")336                setattr(self, key, value)337        # Create beacon338        self.__service = BeaconService(BLE_DEVICE)339        self.__logger.info("Initialized beacon scanner.")340        341    def __del__(self):342        """Instance destruction."""343        if self.__control_file_handle is not None:344            self.__control_file_handle.close()345        self.__control_file.unlink()346        347    @property348    def control_file(self):349        """BLE beacon scanner control file path getter."""350        return self.__control_file351    352    @control_file.setter353    def control_file(self, value):354        """BLE beacon scanner control file path setter.355        356        Raises:357            TypeError: Beacon scanner control file must be a string.358        """359        if not isinstance(value, str):360            raise TypeError("Beacon scanner control file must be a string.")361        # Initialize control file362        self.__control_file = Path(value).resolve()363        self.__control_file.touch()364        self.__control_file.chmod(0o777)365        with self.__control_file.open(mode='w') as f:366            f.write("0")367        self.__control_file_handle = None368    @property369    def scan_prefix(self):370        """BLE beacon scanner scan file prefix getter."""371        return self.__scan_prefix372    373    @scan_prefix.setter374    def scan_prefix(self, value):375        """BLE beacon scanner scan file prefix setter.376        377        Raises:378            TypeError: Beacon scanner scan file prefix must be a string.379        """380        if not isinstance(value, str):381            raise TypeError("Beacon scanner scan file prefix must be a string.")382        self.__scan_prefix = value383   384    @property385    def timeout(self):386        """BLE beacon scanner timeout getter."""387        return self.__timeout;388    389    @timeout.setter390    def timeout(self, value):391        """BLE beacon scanner timeout setter.392        Raises:393            TypeError: Beacon scanner timeout must be a float, integer, or 394                NoneType.395            ValueError: Beacon scanner timeout must be strictly positive.396            ValueError: Beacon scanner cannot exceed maximum allowable timeout.397        """398        if value is not None:399            if not isinstance(value, (float, int)):400                raise TypeError("Beacon scanner timeout must be a float, "401                        "integer, or NoneType.")402            elif value <= 0:403                raise ValueError("Beacon scanner timeout must be strictly "404                        "positive.")405            elif value > MAX_TIMEOUT:406                raise ValueError("Beacon scanner timeout cannot exceed "407                        "maximum allowable timeout.")408        self.__timeout = value409    410    @property411    def revisit(self):412        """BLE beacon scanner revisit interval getter."""413        return self.__revisit414    @revisit.setter415    def revisit(self, value):416        """BLE beacon scanner revisit interval setter.417        Raises:418            TypeError: Beacon scanner revisit interval must be an integer.419            ValueError: Beacon scanner revisit interval must be strictly 420                positive.421         """422        if not isinstance(value, int):423            raise TypeError("Beacon scanner revisit interval must be an "424                    "integer.")425        elif value <= 0:426            raise ValueError("Beacon scanner revisit interval must strictly "427                    "positive.")428        self.__revisit = value429    430    @property431    def filters(self):432        """BLE beacon scanner filters getter."""433        return self.__filters434    435    @filters.setter436    def filters(self, value):437        """BLE beacon scanner filters setter.438        Raises:439            TypeError: Beacon scanner filters must be a dictionary.440            KeyError: Beacon scanner filters must be one of allowable filters.441        """442        if not isinstance(value, dict):443            raise TypeError("Beacon scanner filters must be a dictionary.")444        elif not all([key in ALLOWABLE_FILTERS for key in value.keys()]):445            raise KeyError("Beacon scanner filters must be one of allowable "446                    f"filters {ALLOWABLE_FILTERS}.")447        self.__filters = value448    449    def filter_advertisements(self, advertisements):450        """Filter received beacon advertisements based on filters.451        452        Args:453            advertisements (pandas.DataFrame): Parsed advertisements.454            455        Returns:456            Advertisements with all entries that were not compliant with the 457            filters removed.458        """459        for key, value in self.filters.items():460            # Filter based on fixed identifiers461            if key in ID_FILTERS:462                advertisements = advertisements[advertisements[key].isin([value])]463            # Filter based on measurements464            else:465                query_str = f"{value[0]} <= {key} and {key} <= {value[1]}"466                advertisements = advertisements.query(query_str)467        advertisements.reset_index(inplace=True, drop=True)468        return advertisements469    470    def process_scans(self, scans, timestamps):471        """Process collection of received beacon advertisement scans.472        473        Organize collection of received beacon advertisement scans according 474        to address, payload, and measurements.475        Args:476            scans (list): Received beacon advertisement scans. Each element 477                contains all advertisements received from one scan. Elements 478                are in temporal order.479            timestamps (list): Timestamps associated with each scan.480            481        Returns:482            Advertisements organized in a pandas.DataFrame by address first, 483            timestamp second, and then remainder of advertisement payload, 484            e.g., UUID, major, minor, etc.485        """486        # Collect all advertisements487        advertisements = []488        for (scan, timestamp) in zip_longest(scans, timestamps):489            for address, payload in scan.items():490                advertisement = {'ADDRESS': address, 'TIMESTAMP': timestamp}491                advertisement['UUID'] = payload[0]492                advertisement['MAJOR'] = payload[1]493                advertisement['MINOR'] = payload[2]494                advertisement['TX POWER'] = payload[3]495                advertisement['RSSI'] = payload[4]496                advertisements.append(advertisement)497        # Format into DataFrame498        return  pd.DataFrame(advertisements,columns=['ADDRESS', 'TIMESTAMP', 499            'UUID', 'MAJOR', 'MINOR', 'TX POWER', 'RSSI'])500    def scan(self, scan_prefix='', timeout=0, revisit=1):501        """Execute BLE beacon scan.502        503        Args:504            scan_prefix (str): Scan output file prefix. Final output file name505                will be appended with first scan start timestamp. Defaults to506                configuration value.507            timeout (int, float): Time (s) for which to advertise beacon. If 508                specified as None then advertises till user commanded stop via 509                control file. Defaults to configuration value.510            revisit (int): Time interval (s) between consecutive scans. 511                Defaults to 1.512            513        Returns:514            Filtered advertisements organized in a pandas.DataFrame by address 515            first, timestamp second, and then remainder of advertisement 516            payload, e.g., UUID, major, minor, etc.517        """518        # Parse inputs519        if scan_prefix == '':520            scan_prefix = self.scan_prefix521        if timeout == 0:522            timeout = self.timeout523        # Update control file and scan output file524        with open(self.__control_file, 'w') as f:525            f.write("0")526        scan_file = Path(f"{scan_prefix}_{datetime.now():%Y%m%dT%H%M%S}.csv")527        # Start advertising528        self.__logger.info(f"Starting beacon scanner with timeout {timeout}.")529        self.__control_file_handle = self.__control_file.open(mode='r+')530        run = True        531        timestamps = []532        scans = []533        scan_count = 0534        start_time = time.monotonic()535        while run:536            scan_count += 1537            self.__logger.debug(f"Performing scan #{scan_count} at revisit "538                    f"{self.revisit}.")539            timestamps.append(datetime.now())540            scans.append(self.__service.scan(self.revisit))541            # Stop advertising based on either timeout or control file542            if timeout is not None:543                if (time.monotonic()-start_time) > timeout:544                    self.__logger.debug("Beacon scanner timed out.")545                    run = False546            self.__control_file_handle.seek(0)547            control_flag = self.__control_file_handle.read()548            if control_flag != "0":549                self.__logger.debug("Beacon scanner control flag set to stop.")550                run = False551        self.__logger.info("Stopping beacon scanner.")552        # Cleanup553        self.__control_file_handle.close()554        with self.__control_file.open('w') as f:555            f.write("0")556        # Process, filter, and output received scans557        advertisements = self.process_scans(scans, timestamps)558        advertisements = self.filter_advertisements(advertisements)559        advertisements.to_csv(scan_file, index_label='SCAN')560        return advertisements561    562def setup_logger(config):563    """Setup and return logger based on configuration."""564    logging.config.dictConfig(config['config'])565    return logging.getLogger(config['name'])566    567def close_logger(logger):568    """Close logger."""569    for handler in logger.handlers[:]:570        handler.close()571        logger.removeHandler(handler)572    573def load_config(parsed_args):574    """Load configuration.575    Loads beacon/scanner configuration from parsed input argument. Any576    expected keys not specified use values from default configuration.577    Args:578        parsed_args (Namespace): Parsed input arguments.579        580    Returns:581        Configuration dictionary.582    """583    # Load default configuration if none specified584    if parsed_args['config_yml'] is None:585        config = DEFAULT_CONFIG586    # Load configuration YAML587    else:588        with open(parsed_args['config_yml'], 'r') as f:589            config = yaml.load(f, Loader=yaml.SafeLoader)590        config['advertiser'] = {**DEFAULT_CONFIG['advertiser'], 591                **config['advertiser']}592        config['scanner'] = {**DEFAULT_CONFIG['scanner'], 593                **config['scanner']}594    # Merge configuration values with command line options595    for key, value in parsed_args.items():596        if value is not None:597            if key in config['advertiser']:598                config['advertiser'][key] = value599            if key in config['scanner']:600                config['scanner'][key] = value601    # Remove malformed filters602    if config['scanner']['filters'] is not None:603        filters_to_remove = []604        for key, value in config['scanner']['filters'].items():605            if key not in ALLOWABLE_FILTERS:606                filters_to_remove.append(key)607            elif value is None:608                filters_to_remove.append(key)609            elif key in MEASUREMENT_FILTERS and len(value) != 2:610                filters_to_remove.append(key)611        for filter_to_remove in filters_to_remove:612            del config['scanner']['filters'][filter_to_remove]613    return config614    615def parse_args(args):616    """Input argument parser.617    Args:618        args (list): Input arguments as taken from sys.argv.619        620    Returns:621        Dictionary containing parsed input arguments. Keys are argument names.622    """623    # Parse command line arguments624    parser = argparse.ArgumentParser(625        description=("BLE beacon advertiser or scanner. Command line "626                     "arguments will override their corresponding value in "627                     "a configuration file if specified."))628    mode_group = parser.add_mutually_exclusive_group(required=True)629    mode_group.add_argument('-a', '--advertiser', action='store_true',630                            help="Beacon advertiser mode.")631    mode_group.add_argument('-s', '--scanner', action='store_true',632                            help="Beacon scanner mode.")633    parser.add_argument('--config_yml', help="Configuration YAML.")634    parser.add_argument('--control_file', help="Control file.")635    parser.add_argument('--scan_prefix', help="Scan output file prefix.")636    parser.add_argument('--timeout', type=float, 637            help="Timeout (s) for both beacon advertiser and  scanner modes.")638    parser.add_argument('--uuid', help="Beacon advertiser UUID.")639    parser.add_argument('--major', type=int, 640            help="Beacon advertiser major value.")641    parser.add_argument('--minor', type=int, 642            help="Beacon advertiser minor value.")643    parser.add_argument('--tx_power', type=int, 644            help="Beacon advertiser TX power.")645    parser.add_argument('--interval', type=int,646            help="Beacon advertiser interval (ms).")647    parser.add_argument('--revisit', type=int, 648            help="Beacon scanner revisit interval (s)")649    return vars(parser.parse_args(args))650    651def main(args):652    """Creates beacon and either starts advertising or scanning.653    654    Args:655        args (list): Arguments as provided by sys.argv.656    Returns:657        If advertising then no output (None) is returned. If scanning 658        then scanned advertisements are returned in pandas.DataFrame.659    """660    # Initial setup661    parsed_args = parse_args(args)662    config = load_config(parsed_args)663    logger = setup_logger(config['logger'])664    logger.debug(f"Beacon configuration - {config['advertiser']}")665    logger.debug(f"Scanner configuration - {config['scanner']}")666    667    # Create and start beacon advertiser or scanner668    try:669        if parsed_args['advertiser']:670            logger.info("Beacon advertiser mode selected.")671            advertiser = Advertiser(logger, **config['advertiser'])672            advertiser.advertise()673            output = None674        elif parsed_args['scanner']:675            logger.info("Beacon scanner mode selected.")676            scanner = Scanner(logger, **config['scanner'])677            advertisements = scanner.scan()678            output = advertisements679    except Exception:680        logger.exception("Fatal exception encountered")681    finally:682        close_logger(logger)683    return output684    685if __name__ == "__main__":686    """Script execution."""687    main(sys.argv[1:])688    ...pi_pact.py
Source:pi_pact.py  
1#!/usr/bin/python32# -*- mode: python; coding: utf-8 -*-3"""Bluetooth Low Energy (BLE) beacon advertisement and scanning.4Execution of a BLE beacon for use in BWSI PiPact independent project. 5Configuration of beacon done via external YAML. Underlying functionality 6provided by PyBluez module (https://github.com/pybluez/pybluez). Beacon 7uses iBeacon format (https://en.wikipedia.org/wiki/IBeacon).8"""9import argparse10from bluetooth.ble import BeaconService11from datetime import datetime12from itertools import zip_longest13import logging14import logging.config15import pandas as pd16from pathlib import Path17import sys18import time19from uuid import uuid120import yaml21# Default configuration22LOG_NAME = 'pi_pact.log'23DEFAULT_CONFIG = {24    'advertiser': {25        'control_file': "advertiser_control",26        'timeout': None,27        'uuid': '',28        'major': 1,29        'minor': 1,30        'tx_power': 1,31        'interval': 20032        },33    'scanner': {34        'control_file': "scanner_control",35        'scan_prefix': "pi_pact_scan",36        'timeout': None,37        'revisit': 1,38        'filters': {}39        },40    'logger': {41        'name': LOG_NAME,42        'config': {43            'version': 1,44            'formatters': {45                'full': {46                    'format': '%(asctime)s   %(module)-10s   %(levelname)-8s   %(message)s'},47                'brief': {48                    'format': '%(asctime)s   %(levelname)-8s   %(message)s'},49                },50            'handlers': {51                'console': {52                    'class': 'logging.StreamHandler',53                    'level': 'INFO',54                    'formatter': 'brief'55                    },56                'file': {57                    'class': 'logging.handlers.TimedRotatingFileHandler',58                    'level': 'DEBUG',59                    'formatter': 'full',60                    'filename': LOG_NAME,61                    'when': 'H',62                    'interval': 163                    }64                },65            'loggers': {66                LOG_NAME: {67                    'level': 'DEBUG',68                    'handlers': ['console', 'file']69                    }70                }71            }72        }73    }74# Universal settings75BLE_DEVICE = "hci0"76CONTROL_INTERVAL = 1 # (s)77MAX_TIMEOUT = 600 # (s)78ID_FILTERS = ['ADDRESS', 'UUID', 'MAJOR', 'MINOR', 'TX POWER']79MEASUREMENT_FILTERS = ['TIMESTAMP', 'RSSI']80# Limits81MAJOR_LIMITS = [1, 65535]82MINOR_LIMITS = [1, 65535]83TX_POWER_LIMITS = [-40, 4]84INTERVAL_LIMITS = [20, 10000] # (ms)85ALLOWABLE_FILTERS = ID_FILTERS+MEASUREMENT_FILTERS86class Advertiser(object):87    """Instantiates a BLE beacon advertiser.88    89    Attributes:90        control_file (pathlib.Path): BLE beacon advertiser control file path.91        timeout (float, int): BLE beacon advertiser timeout (s). Must be 92            strictly positive and less than 600.93        uuid (str): BLE beacon advertiser UUID. Must be 32 hexadecimal digits 94            split into 5 groups separated by hyphens. The number of digits in 95            each group from first to last) is {8, 4, 4, 4, 12}.96        major (int): BLE beacon advertiser major value. Must be in [1, 65535].97        minor (int): BLE beacon advertiser minor value. Must be in [1, 65535].98        tx_power (int): BLE beacon advertiser TX power value. Must be in 99            [-40, 4].100        interval (int): BLE beacon advertiser interval (ms) value. Must be in 101            [20, 10000].102    """103    def __init__(self, logger, **kwargs):104        """Instance initialization.105        Args:106            logger (logging.Logger): Configured logger.107            **kwargs: Keyword arguments corresponding to instance attributes. 108                Any unassociated keyword arguments are ignored.109        """110        # Logger111        self.__logger = logger112        # Beacon settings113        for key, value in DEFAULT_CONFIG['advertiser'].items():114            if key in kwargs and kwargs[key]:115                setattr(self, key, kwargs[key])116            else:117                self.__logger.debug("Using default beacon advertiser "118                        f"configuration {key}: {value}.")119                setattr(self, key, value)120        # Create beacon121        self.__service = BeaconService(BLE_DEVICE)122        self.__logger.info("Initialized beacon advertiser.")123        124    def __del__(self):125        """Instance destruction."""126        if self.__control_file_handle is not None:127            self.__control_file_handle.close()128        self.__control_file.unlink()129        130    @property131    def control_file(self):132        """BLE beacon advertiser control file path getter."""133        return self.__control_file134    135    @control_file.setter136    def control_file(self, value):137        """BLE beacon advertiser control file path setter.138        139        Raises:140            TypeError: Beacon advertiser control file must be a string.141        """142        if not isinstance(value, str):143            raise TypeError("Beacon advertiser control file must be a string.")144        else:145            self.__control_file = Path(value).resolve()146            self.__control_file.touch()147            self.__control_file.chmod(0o777)148            with self.__control_file.open(mode='w') as f:149                f.write("0")150            self.__control_file_handle = None151                152    @property153    def timeout(self):154        """BLE beacon advertiser timeout getter."""155        return self.__timeout;156    157    @timeout.setter158    def timeout(self, value):159        """BLE beacon advertiser timeout setter.160        Raises:161            TypeError: Beacon advertiser timeout must be a float, integer, or 162                NoneType.163            ValueError: Beacon advertiser timeout must be strictly positive.164            ValueError: Beacon advertisertimeout cannot exceed maximum 165                allowable timeout.166        """167        if value is not None:168            if not isinstance(value, (float, int)):169                raise TypeError("Beacon advertiser timeout must be a float, "170                        "integer, or NoneType.")171            elif value <= 0:172                raise ValueError("Beacon advertiser timeout must be strictly "173                        "positive.")174            elif value > MAX_TIMEOUT:175                raise ValueError("Beacon advertiser timeout cannot exceed "176                        "maximum allowable timeout.")177        self.__timeout = value178    179    @property180    def uuid(self):181        """BLE beacon advertiser UUID getter."""182        return self.__uuid;183    @uuid.setter184    def uuid(self, value):185        """BLE beacon advertiser UUID setter.186        Raises:187            TypeError: Beacon advertiser UUID must be a string.188        """189        if not isinstance(value, str):190            raise TypeError("Beacon advertiser UUID must be a string.")191        elif not value:192            self.__uuid = str(uuid1())193            self.__logger.debug(f"Beacon advertiser UUID set to {self.__uuid}")194        else:195            self.__uuid = value;196    @property197    def major(self):198        """BLE beacon advertiser major value getter."""199        return self.__major200    @major.setter201    def major(self, value):202        """BLE beacon advertiser major value setter.203        Raises:204            TypeError: Beacon advertiser major value must be an integer.205            ValueError: Beacon advertiser major value must be in [1, 65535].206         """207        if not isinstance(value, int):208            raise TypeError("Beacon advertiser major value must be an integer.")209        elif value < MAJOR_LIMITS[0] or value > MAJOR_LIMITS[1]:210            raise ValueError("Beacon advertiser major value must be in range "211                    f"{MAJOR_LIMITS}.")212        self.__major = value213            214    @property215    def minor(self):216        """BLE beacon advertiser minor value getter."""217        return self.__minor218    @minor.setter219    def minor(self, value):220        """BLE beacon advertiser minor value setter.221        Raises:222            TypeError: Beacon advertiser minor value must be an integer.223            ValueError: Beacon advertiser minor value must be in [1, 65535].224         """225        if not isinstance(value, int):226            raise TypeError("Beacon advertiser minor value must be an integer.")227        elif value < MINOR_LIMITS[0] or value > MINOR_LIMITS[1]:228            raise ValueError("Beacon advertiser minor value must be in range "229                    f"{MINOR_LIMITS}.")230        self.__minor = value231    @property232    def tx_power(self):233        """BLE beacon advertiser TX power value getter."""234        return self.__tx_power235    @tx_power.setter236    def tx_power(self, value):237        """BLE beacon Beacon advertiser TX power setter.238        Raises:239            TypeError: Beacon advertiser TX power must be an integer.240            ValueError: Beacon advertiser TX power must be in [-40, 4].241         """242        if not isinstance(value, int):243            raise TypeError("Beacon advertiser TX power must be an integer.")244        elif value < TX_POWER_LIMITS[0] or value > TX_POWER_LIMITS[1]:245            raise ValueError("Beacon advertiser TX power must be in range "246                    f"{TX_POWER_LIMITS}.")247        self.__tx_power = value248    @property249    def interval(self):250        """BLE beacon advertiser interval getter."""251        return self.__interval252    @interval.setter253    def interval(self, value):254        """BLE beacon advertiser interval setter.255        Raises:256            TypeError: Beacon advertiser interval must be an integer.257            ValueError: Beacon advertiser interval must be in [20, 10000].258         """259        if not isinstance(value, int):260            raise TypeError("Beacon advertiser interval must be an integer.")261        elif value < INTERVAL_LIMITS[0] or value > INTERVAL_LIMITS[1]:262            raise ValueError("Beacon advertiser interval must be in range "263                    f"{INTERVAL_LIMITS}.")264        self.__interval = value265            266    def advertise(self, timeout=0):267        """Execute BLE beacon advertisement.268        269        Args:270            timeout (int, float): Time (s) for which to advertise beacon. If271                specified as None then advertises till user commanded stop via272                control file. Defaults to configuration value.273        """274        # Parse inputs275        if timeout == 0:276            timeout = self.timeout277        # Update control file278        with self.__control_file.open(mode='w') as f:279            f.write("0")280        # Start advertising281        self.__logger.info("Starting beacon advertiser with timeout "282                f"{timeout}.")283        self.__service.start_advertising(self.uuid, self.major, self.minor,284                                         self.tx_power, self.interval)285        # Stop advertising based on either timeout or control file286        start_time = time.monotonic()287        self.__control_file_handle = self.__control_file.open(mode='r+')288        run = True289        while run:290            time.sleep(CONTROL_INTERVAL)291            if timeout is not None:292                if (time.monotonic()-start_time) > timeout:293                    self.__logger.debug("Beacon advertiser timed out.")294                    run = False295            self.__control_file_handle.seek(0)296            control_flag = self.__control_file_handle.read()297            if control_flag != "0":298                self.__logger.debug("Beacon advertiser control flag set to "299                        "stop.")300                run = False301        self.__logger.info("Stopping beacon advertiser.")        302        self.__service.stop_advertising()303        # Cleanup304        self.__control_file_handle.close()305        with self.__control_file.open('w') as f:306            f.write("0")307            308class Scanner(object):309    """Instantiates a BLE beacon scanner.310    311    Attributes:312        control_file (pathlib.Path): BLE beacon scanner control file path.313        timeout (float, int): BLE beacon scanner timeout (s). Must be strictly314            positive and less than 600.315        revisit (int): BLE beacon scanner revisit interval (s). Must be 316            strictly positive.317        filters (dict): Filters to apply to received beacons. Available318            filters/keys are {'address', 'uuid', 'major', 'minor'}.319    """320    def __init__(self, logger, **kwargs):321        """Instance initialization.322        Args:323            logger (logging.Logger): Configured logger.324            **kwargs: Keyword arguments corresponding to instance attributes. 325                Any unassociated keyword arguments are ignored.326        """327        # Logger328        self.__logger = logger329        # Beacon settings330        for key, value in DEFAULT_CONFIG['scanner'].items():331            if key in kwargs and kwargs[key]:332                setattr(self, key, kwargs[key])333            else:334                self.__logger.debug("Using default beacon scanner "335                        f"configuration {key}: {value}.")336                setattr(self, key, value)337        # Create beacon338        self.__service = BeaconService(BLE_DEVICE)339        self.__logger.info("Initialized beacon scanner.")340        341    def __del__(self):342        """Instance destruction."""343        if self.__control_file_handle is not None:344            self.__control_file_handle.close()345        self.__control_file.unlink()346        347    @property348    def control_file(self):349        """BLE beacon scanner control file path getter."""350        return self.__control_file351    352    @control_file.setter353    def control_file(self, value):354        """BLE beacon scanner control file path setter.355        356        Raises:357            TypeError: Beacon scanner control file must be a string.358        """359        if not isinstance(value, str):360            raise TypeError("Beacon scanner control file must be a string.")361        # Initialize control file362        self.__control_file = Path(value).resolve()363        self.__control_file.touch()364        self.__control_file.chmod(0o777)365        with self.__control_file.open(mode='w') as f:366            f.write("0")367        self.__control_file_handle = None368    @property369    def scan_prefix(self):370        """BLE beacon scanner scan file prefix getter."""371        return self.__scan_prefix372    373    @scan_prefix.setter374    def scan_prefix(self, value):375        """BLE beacon scanner scan file prefix setter.376        377        Raises:378            TypeError: Beacon scanner scan file prefix must be a string.379        """380        if not isinstance(value, str):381            raise TypeError("Beacon scanner scan file prefix must be a string.")382        self.__scan_prefix = value383   384    @property385    def timeout(self):386        """BLE beacon scanner timeout getter."""387        return self.__timeout;388    389    @timeout.setter390    def timeout(self, value):391        """BLE beacon scanner timeout setter.392        Raises:393            TypeError: Beacon scanner timeout must be a float, integer, or 394                NoneType.395            ValueError: Beacon scanner timeout must be strictly positive.396            ValueError: Beacon scanner cannot exceed maximum allowable timeout.397        """398        if value is not None:399            if not isinstance(value, (float, int)):400                raise TypeError("Beacon scanner timeout must be a float, "401                        "integer, or NoneType.")402            elif value <= 0:403                raise ValueError("Beacon scanner timeout must be strictly "404                        "positive.")405            elif value > MAX_TIMEOUT:406                raise ValueError("Beacon scanner timeout cannot exceed "407                        "maximum allowable timeout.")408        self.__timeout = value409    410    @property411    def revisit(self):412        """BLE beacon scanner revisit interval getter."""413        return self.__revisit414    @revisit.setter415    def revisit(self, value):416        """BLE beacon scanner revisit interval setter.417        Raises:418            TypeError: Beacon scanner revisit interval must be an integer.419            ValueError: Beacon scanner revisit interval must be strictly 420                positive.421         """422        if not isinstance(value, int):423            raise TypeError("Beacon scanner revisit interval must be an "424                    "integer.")425        elif value <= 0:426            raise ValueError("Beacon scanner revisit interval must strictly "427                    "positive.")428        self.__revisit = value429    430    @property431    def filters(self):432        """BLE beacon scanner filters getter."""433        return self.__filters434    435    @filters.setter436    def filters(self, value):437        """BLE beacon scanner filters setter.438        Raises:439            TypeError: Beacon scanner filters must be a dictionary.440            KeyError: Beacon scanner filters must be one of allowable filters.441        """442        if not isinstance(value, dict):443            raise TypeError("Beacon scanner filters must be a dictionary.")444        elif not all([key in ALLOWABLE_FILTERS for key in value.keys()]):445            raise KeyError("Beacon scanner filters must be one of allowable "446                    f"filters {ALLOWABLE_FILTERS}.")447        self.__filters = value448    449    def filter_advertisements(self, advertisements):450        """Filter received beacon advertisements based on filters.451        452        Args:453            advertisements (pandas.DataFrame): Parsed advertisements.454            455        Returns:456            Advertisements with all entries that were not compliant with the 457            filters removed.458        """459        for key, value in self.filters.items():460            # Filter based on fixed identifiers461            if key in ID_FILTERS:462                advertisements = advertisements[advertisements[key].isin([value])]463            # Filter based on measurements464            else:465                query_str = f"{value[0]} <= {key} and {key} <= {value[1]}"466                advertisements = advertisements.query(query_str)467        advertisements.reset_index(inplace=True, drop=True)468        return advertisements469    470    def process_scans(self, scans, timestamps):471        """Process collection of received beacon advertisement scans.472        473        Organize collection of received beacon advertisement scans according 474        to address, payload, and measurements.475        Args:476            scans (list): Received beacon advertisement scans. Each element 477                contains all advertisements received from one scan. Elements 478                are in temporal order.479            timestamps (list): Timestamps associated with each scan.480            481        Returns:482            Advertisements organized in a pandas.DataFrame by address first, 483            timestamp second, and then remainder of advertisement payload, 484            e.g., UUID, major, minor, etc.485        """486        # Collect all advertisements487        advertisements = []488        for (scan, timestamp) in zip_longest(scans, timestamps):489            for address, payload in scan.items():490                advertisement = {'ADDRESS': address, 'TIMESTAMP': timestamp}491                advertisement['UUID'] = payload[0]492                advertisement['MAJOR'] = payload[1]493                advertisement['MINOR'] = payload[2]494                advertisement['TX POWER'] = payload[3]495                advertisement['RSSI'] = payload[4]496                advertisements.append(advertisement)497        # Format into DataFrame498        return  pd.DataFrame(advertisements,columns=['ADDRESS', 'TIMESTAMP', 499            'UUID', 'MAJOR', 'MINOR', 'TX POWER', 'RSSI'])500    def scan(self, scan_prefix='', timeout=0, revisit=1):501        """Execute BLE beacon scan.502        503        Args:504            scan_prefix (str): Scan output file prefix. Final output file name505                will be appended with first scan start timestamp. Defaults to506                configuration value.507            timeout (int, float): Time (s) for which to advertise beacon. If 508                specified as None then advertises till user commanded stop via 509                control file. Defaults to configuration value.510            revisit (int): Time interval (s) between consecutive scans. 511                Defaults to 1.512            513        Returns:514            Filtered advertisements organized in a pandas.DataFrame by address 515            first, timestamp second, and then remainder of advertisement 516            payload, e.g., UUID, major, minor, etc.517        """518        # Parse inputs519        if scan_prefix == '':520            scan_prefix = self.scan_prefix521        if timeout == 0:522            timeout = self.timeout523        # Update control file and scan output file524        with open(self.__control_file, 'w') as f:525            f.write("0")526        scan_file = Path(f"{scan_prefix}_{datetime.now():%Y%m%dT%H%M%S}.csv")527        # Start advertising528        self.__logger.info(f"Starting beacon scanner with timeout {timeout}.")529        self.__control_file_handle = self.__control_file.open(mode='r+')530        run = True        531        timestamps = []532        scans = []533        scan_count = 0534        start_time = time.monotonic()535        while run:536            scan_count += 1537            self.__logger.debug(f"Performing scan #{scan_count} at revisit "538                    f"{self.revisit}.")539            timestamps.append(datetime.now())540            scans.append(self.__service.scan(self.revisit))541            # Stop advertising based on either timeout or control file542            if timeout is not None:543                if (time.monotonic()-start_time) > timeout:544                    self.__logger.debug("Beacon scanner timed out.")545                    run = False546            self.__control_file_handle.seek(0)547            control_flag = self.__control_file_handle.read()548            if control_flag != "0":549                self.__logger.debug("Beacon scanner control flag set to stop.")550                run = False551        self.__logger.info("Stopping beacon scanner.")552        # Cleanup553        self.__control_file_handle.close()554        with self.__control_file.open('w') as f:555            f.write("0")556        # Process, filter, and output received scans557        advertisements = self.process_scans(scans, timestamps)558        advertisements = self.filter_advertisements(advertisements)559        advertisements.to_csv(scan_file, index_label='SCAN')560        return advertisements561    562def setup_logger(config):563    """Setup and return logger based on configuration."""564    logging.config.dictConfig(config['config'])565    return logging.getLogger(config['name'])566    567def close_logger(logger):568    """Close logger."""569    for handler in logger.handlers[:]:570        handler.close()571        logger.removeHandler(handler)572    573def load_config(parsed_args):574    """Load configuration.575    Loads beacon/scanner configuration from parsed input argument. Any576    expected keys not specified use values from default configuration.577    Args:578        parsed_args (Namespace): Parsed input arguments.579        580    Returns:581        Configuration dictionary.582    """583    # Load default configuration if none specified584    if parsed_args['config_yml'] is None:585        config = DEFAULT_CONFIG586    # Load configuration YAML587    else:588        with open(parsed_args['config_yml'], 'r') as f:589            config = yaml.load(f, Loader=yaml.SafeLoader)590        config['advertiser'] = {**DEFAULT_CONFIG['advertiser'], 591                **config['advertiser']}592        config['scanner'] = {**DEFAULT_CONFIG['scanner'], 593                **config['scanner']}594    # Merge configuration values with command line options595    for key, value in parsed_args.items():596        if value is not None:597            if key in config['advertiser']:598                config['advertiser'][key] = value599            if key in config['scanner']:600                config['scanner'][key] = value601    # Remove malformed filters602    if config['scanner']['filters'] is not None:603        filters_to_remove = []604        for key, value in config['scanner']['filters'].items():605            if key not in ALLOWABLE_FILTERS:606                filters_to_remove.append(key)607            elif value is None:608                filters_to_remove.append(key)609            elif key in MEASUREMENT_FILTERS and len(value) != 2:610                filters_to_remove.append(key)611        for filter_to_remove in filters_to_remove:612            del config['scanner']['filters'][filter_to_remove]613    return config614    615def parse_args(args):616    """Input argument parser.617    Args:618        args (list): Input arguments as taken from sys.argv.619        620    Returns:621        Dictionary containing parsed input arguments. Keys are argument names.622    """623    # Parse command line arguments624    parser = argparse.ArgumentParser(625        description=("BLE beacon advertiser or scanner. Command line "626                     "arguments will override their corresponding value in "627                     "a configuration file if specified."))628    mode_group = parser.add_mutually_exclusive_group(required=True)629    mode_group.add_argument('-a', '--advertiser', action='store_true',630                            help="Beacon advertiser mode.")631    mode_group.add_argument('-s', '--scanner', action='store_true',632                            help="Beacon scanner mode.")633    parser.add_argument('--config_yml', help="Configuration YAML.")634    parser.add_argument('--control_file', help="Control file.")635    parser.add_argument('--scan_prefix', help="Scan output file prefix.")636    parser.add_argument('--timeout', type=float, 637            help="Timeout (s) for both beacon advertiser and  scanner modes.")638    parser.add_argument('--uuid', help="Beacon advertiser UUID.")639    parser.add_argument('--major', type=int, 640            help="Beacon advertiser major value.")641    parser.add_argument('--minor', type=int, 642            help="Beacon advertiser minor value.")643    parser.add_argument('--tx_power', type=int, 644            help="Beacon advertiser TX power.")645    parser.add_argument('--interval', type=int,646            help="Beacon advertiser interval (ms).")647    parser.add_argument('--revisit', type=int, 648            help="Beacon scanner revisit interval (s)")649    return vars(parser.parse_args(args))650    651def main(args):652    """Creates beacon and either starts advertising or scanning.653    654    Args:655        args (list): Arguments as provided by sys.argv.656    Returns:657        If advertising then no output (None) is returned. If scanning 658        then scanned advertisements are returned in pandas.DataFrame.659    """660    # Initial setup661    parsed_args = parse_args(args)662    config = load_config(parsed_args)663    logger = setup_logger(config['logger'])664    logger.debug(f"Beacon configuration - {config['advertiser']}")665    logger.debug(f"Scanner configuration - {config['scanner']}")666    667    # Create and start beacon advertiser or scanner668    try:669        if parsed_args['advertiser']:670            logger.info("Beacon advertiser mode selected.")671            advertiser = Advertiser(logger, **config['advertiser'])672            advertiser.advertise()673            output = None674        elif parsed_args['scanner']:675            logger.info("Beacon scanner mode selected.")676            scanner = Scanner(logger, **config['scanner'])677            advertisements = scanner.scan()678            output = advertisements679    except Exception:680        logger.exception("Fatal exception encountered")681    finally:682        close_logger(logger)683    return output684    685if __name__ == "__main__":686    """Script execution."""687    main(sys.argv[1:])...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
