Best Python code snippet using playwright-python
shell.py
Source:shell.py  
1# Copyright 2010 Jacob Kaplan-Moss2# Copyright 2011 X7 LLC.3# All Rights Reserved.4#5#    Licensed under the Apache License, Version 2.0 (the "License"); you may6#    not use this file except in compliance with the License. You may obtain7#    a copy of the License at8#9#         http://www.apache.org/licenses/LICENSE-2.010#11#    Unless required by applicable law or agreed to in writing, software12#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT13#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the14#    License for the specific language governing permissions and limitations15#    under the License.16import getpass17import os18from engineclient import exceptions19from engineclient import utils20from engineclient.v1_1 import servers21AUTO_KEY = object()22def _boot(cs, args, reservation_id=None, min_count=None, max_count=None):23    """Boot a new server."""24    if min_count is None:25        min_count = 126    if max_count is None:27        max_count = min_count28    if min_count > max_count:29        raise exceptions.CommandError("min_instances should be <= "30                                      "max_instances")31    if not min_count or not max_count:32        raise exceptions.CommandError("min_instances nor max_instances should"33                                      "be 0")34    if not args.image and not args.block_device_mapping:35        raise exceptions.CommandError("you need to specify an Image ID "36                                      "or a block device mapping ")37    if not args.flavor:38        raise exceptions.CommandError("you need to specify a Flavor ID ")39    flavor = args.flavor40    image = args.image41    meta = dict(v.split('=') for v in args.meta)42    files = {}43    for f in args.files:44        dst, src = f.split('=', 1)45        try:46            files[dst] = open(src)47        except IOError, e:48            raise exceptions.CommandError("Can't open '%s': %s" % (src, e))49    # use the os-keypair extension50    key_name = None51    if args.key_name is not None:52        key_name = args.key_name53    # or use file injection functionality (independent of os-keypair extension)54    keyfile = None55    if args.key_path is AUTO_KEY:56        possible_keys = [os.path.join(os.path.expanduser('~'), '.ssh', k)57                         for k in ('id_dsa.pub', 'id_rsa.pub')]58        for k in possible_keys:59            if os.path.exists(k):60                keyfile = k61                break62        else:63            raise exceptions.CommandError("Couldn't find a key file: tried "64                               "~/.ssh/id_dsa.pub or ~/.ssh/id_rsa.pub")65    elif args.key_path:66        keyfile = args.key_path67    if keyfile:68        try:69            files['/root/.ssh/authorized_keys2'] = open(keyfile)70        except IOError, e:71            raise exceptions.CommandError("Can't open '%s': %s" % (keyfile, e))72    if args.user_data:73        try:74            userdata = open(args.user_data)75        except IOError, e:76            raise exceptions.CommandError("Can't open '%s': %s" % \77                                          (args.user_data, e))78    else:79        userdata = None80    if args.availability_zone:81        availability_zone = args.availability_zone82    else:83        availability_zone = None84    if args.security_groups:85        security_groups = args.security_groups.split(',')86    else:87        security_groups = None88    block_device_mapping = {}89    for bdm in args.block_device_mapping:90        device_name, mapping = bdm.split('=', 1)91        block_device_mapping[device_name] = mapping92    nics = []93    for nic_str in args.nics:94        nic_info = {"net-id": "", "v4-fixed-ip": ""}95        for kv_str in nic_str.split(","):96            k, v = kv_str.split("=")97            nic_info[k] = v98        nics.append(nic_info)99    boot_args = [args.name, image, flavor]100    boot_kwargs = dict(101            meta=meta,102            files=files,103            key_name=key_name,104            reservation_id=reservation_id,105            min_count=min_count,106            max_count=max_count,107            userdata=userdata,108            availability_zone=availability_zone,109            security_groups=security_groups,110            block_device_mapping=block_device_mapping,111            nics=nics)112    return boot_args, boot_kwargs113@utils.arg('--flavor',114     default=None,115     metavar='<flavor>',116     help="Flavor ID (see 'engine flavor-list').")117@utils.arg('--image',118     default=None,119     metavar='<image>',120     help="Image ID (see 'engine image-list'). ")121@utils.arg('--meta',122     metavar="<key=value>",123     action='append',124     default=[],125     help="Record arbitrary key/value metadata. "\126          "May be give multiple times.")127@utils.arg('--file',128     metavar="<dst-path=src-path>",129     action='append',130     dest='files',131     default=[],132     help="Store arbitrary files from <src-path> locally to <dst-path> "\133          "on the new server. You may store up to 5 files.")134@utils.arg('--key_path',135     metavar='<key_path>',136     nargs='?',137     const=AUTO_KEY,138     help="Key the server with an SSH keypair. "\139          "Looks in ~/.ssh for a key, "\140          "or takes an explicit <path> to one. (uses --file functionality)")141@utils.arg('--key_name',142     metavar='<key_name>',143     help="Key name of keypair that should be created earlier with \144           the command keypair-add")145@utils.arg('name', metavar='<name>', help='Name for the new server')146@utils.arg('--user_data',147     default=None,148     metavar='<user-data>',149     help="user data file to pass to be exposed by the metadata server.")150@utils.arg('--availability_zone',151     default=None,152     metavar='<availability-zone>',153     help="zone id.")154@utils.arg('--security_groups',155     default=None,156     metavar='<security_groups>',157     help="comma separated list of security group names.")158@utils.arg('--block_device_mapping',159     metavar="<dev_name=mapping>",160     action='append',161     default=[],162     help="Block device mapping in the format "163         "<dev_name=<id>:<type>:<size(GB)>:<delete_on_terminate>.")164@utils.arg('--nic',165     metavar="<net-id=net-uuid,v4-fixed-ip=ip-addr>",166     action='append',167     dest='nics',168     default=[],169     help="Create a NIC on the server.\n"170           "Specify option multiple times to create multiple NICs.\n"171           "net-id: attach NIC to network with this UUID (optional)\n"172           "v4-fixed-ip: IPv4 fixed address for NIC (optional).")173def do_boot(cs, args):174    """Boot a new server."""175    boot_args, boot_kwargs = _boot(cs, args)176    extra_boot_kwargs = utils.get_resource_manager_extra_kwargs(do_boot, args)177    boot_kwargs.update(extra_boot_kwargs)178    server = cs.servers.create(*boot_args, **boot_kwargs)179    # Keep any information (like adminPass) returned by create180    info = server._info181    server = cs.servers.get(info['id'])182    info.update(server._info)183    flavor = info.get('flavor', {})184    flavor_id = flavor.get('id', '')185    info['flavor'] = _find_flavor(cs, flavor_id).name186    image = info.get('image', {})187    image_id = image.get('id', '')188    info['image'] = _find_image(cs, image_id).name189    info.pop('links', None)190    info.pop('addresses', None)191    utils.print_dict(info)192@utils.arg('--flavor',193     default=None,194     metavar='<flavor>',195     help="Flavor ID (see 'engine flavor-list')")196@utils.arg('--image',197     default=None,198     metavar='<image>',199     help="Image ID (see 'engine image-list').")200@utils.arg('--meta',201     metavar="<key=value>",202     action='append',203     default=[],204     help="Record arbitrary key/value metadata. "\205          "May be give multiple times.")206@utils.arg('--file',207     metavar="<dst-path=src-path>",208     action='append',209     dest='files',210     default=[],211     help="Store arbitrary files from <src-path> locally to <dst-path> "\212          "on the new server. You may store up to 5 files.")213@utils.arg('--key',214     metavar='<path>',215     nargs='?',216     const=AUTO_KEY,217     help="Key the server with an SSH keypair. "\218          "Looks in ~/.ssh for a key, "\219          "or takes an explicit <path> to one.")220@utils.arg('--reservation_id',221     default=None,222     metavar='<reservation_id>',223     help="Reservation ID (a UUID). "\224          "If unspecified will be generated by the server.")225@utils.arg('--min_instances',226     default=None,227     type=int,228     metavar='<number>',229     help="The minimum number of instances to build. "\230             "Defaults to 1.")231@utils.arg('--max_instances',232     default=None,233     type=int,234     metavar='<number>',235     help="The maximum number of instances to build. "\236             "Defaults to 'min_instances' setting.")237@utils.arg('name', metavar='<name>', help='Name for the new server')238def do_zone_boot(cs, args):239    """Boot a new server, potentially across Zones."""240    boot_args, boot_kwargs = _boot(cs,241                                   args,242                                   reservation_id=args.reservation_id,243                                   min_count=args.min_instances,244                                   max_count=args.max_instances)245    extra_boot_kwargs = utils.get_resource_manager_extra_kwargs(246            do_zone_boot, args)247    boot_kwargs.update(extra_boot_kwargs)248    reservation_id = cs.zones.boot(*boot_args, **boot_kwargs)249    print "Reservation ID=", reservation_id250def _translate_flavor_keys(collection):251    convert = [('ram', 'memory_mb'), ('disk', 'local_gb')]252    for item in collection:253        keys = item.__dict__.keys()254        for from_key, to_key in convert:255            if from_key in keys and to_key not in keys:256                setattr(item, to_key, item._info[from_key])257def do_flavor_list(cs, args):258    """Print a list of available 'flavors' (sizes of servers)."""259    flavors = cs.flavors.list()260    _translate_flavor_keys(flavors)261    utils.print_list(flavors, [262        'ID',263        'Name',264        'Memory_MB',265        'Swap',266        'Local_GB',267        'VCPUs',268        'RXTX_Factor'])269def do_image_list(cs, args):270    """Print a list of available images to boot from."""271    image_list = cs.images.list()272    def parse_server_name(image):273        try:274            return image.server['id']275        except (AttributeError, KeyError):276            return ''277    fmts = {'Server': parse_server_name}278    utils.print_list(image_list, ['ID', 'Name', 'Status', 'Server'], fmts)279@utils.arg('image',280     metavar='<image>',281     help="Name or ID of image")282@utils.arg('action',283     metavar='<action>',284     choices=['set', 'delete'],285     help="Actions: 'set' or 'delete'")286@utils.arg('metadata',287     metavar='<key=value>',288     nargs='+',289     action='append',290     default=[],291     help='Metadata to add/update or delete (only key is necessary on delete)')292def do_image_meta(cs, args):293    """Set or Delete metadata on an image."""294    image = _find_image(cs, args.image)295    metadata = {}296    for metadatum in args.metadata[0]:297        # Can only pass the key in on 'delete'298        # So this doesn't have to have '='299        if metadatum.find('=') > -1:300            (key, value) = metadatum.split('=', 1)301        else:302            key = metadatum303            value = None304        metadata[key] = value305    if args.action == 'set':306        cs.images.set_meta(image, metadata)307    elif args.action == 'delete':308        cs.images.delete_meta(image, metadata.keys())309def _print_image(image):310    info = image._info.copy()311    # ignore links, we don't need to present those312    info.pop('links')313    # try to replace a server entity to just an id314    server = info.pop('server', None)315    try:316        info['server'] = server['id']317    except (KeyError, TypeError):318        pass319    # break up metadata and display each on its own row320    metadata = info.pop('metadata', {})321    try:322        for key, value in metadata.items():323            _key = 'metadata %s' % key324            info[_key] = value325    except AttributeError:326        pass327    utils.print_dict(info)328@utils.arg('image',329     metavar='<image>',330     help="Name or ID of image")331def do_image_show(cs, args):332    """Show details about the given image."""333    image = _find_image(cs, args.image)334    _print_image(image)335@utils.arg('image', metavar='<image>', help='Name or ID of image.')336def do_image_delete(cs, args):337    """338    Delete an image.339    It should go without saying, but you can only delete images you340    created.341    """342    image = _find_image(cs, args.image)343    image.delete()344@utils.arg('--reservation_id',345    dest='reservation_id',346    metavar='<reservation_id>',347    default=None,348    help='Only return instances that match reservation_id.')349@utils.arg('--recurse_zones',350    dest='recurse_zones',351    metavar='<0|1>',352    nargs='?',353    type=int,354    const=1,355    default=0,356    help='Recurse through all zones if set.')357@utils.arg('--ip',358    dest='ip',359    metavar='<ip_regexp>',360    default=None,361    help='Search with regular expression match by IP address')362@utils.arg('--ip6',363    dest='ip6',364    metavar='<ip6_regexp>',365    default=None,366    help='Search with regular expression match by IPv6 address')367@utils.arg('--name',368    dest='name',369    metavar='<name_regexp>',370    default=None,371    help='Search with regular expression match by name')372@utils.arg('--instance_name',373    dest='instance_name',374    metavar='<name_regexp>',375    default=None,376    help='Search with regular expression match by instance name')377@utils.arg('--status',378    dest='status',379    metavar='<status>',380    default=None,381    help='Search by server status')382@utils.arg('--flavor',383    dest='flavor',384    metavar='<flavor>',385    type=int,386    default=None,387    help='Search by flavor ID')388@utils.arg('--image',389    dest='image',390    metavar='<image>',391    default=None,392    help='Search by image ID')393@utils.arg('--host',394    dest='host',395    metavar='<hostname>',396    default=None,397    help='Search instances by hostname to which they are assigned')398def do_list(cs, args):399    """List active servers."""400    recurse_zones = args.recurse_zones401    search_opts = {402            'reservation_id': args.reservation_id,403            'recurse_zones': recurse_zones,404            'ip': args.ip,405            'ip6': args.ip6,406            'name': args.name,407            'image': args.image,408            'flavor': args.flavor,409            'status': args.status,410            'host': args.host,411            'instance_name': args.instance_name}412    if recurse_zones:413        id_col = 'UUID'414    else:415        id_col = 'ID'416    columns = [id_col, 'Name', 'Status', 'Networks']417    formatters = {'Networks': utils._format_servers_list_networks}418    utils.print_list(cs.servers.list(search_opts=search_opts), columns,419                     formatters)420@utils.arg('--hard',421    dest='reboot_type',422    action='store_const',423    const=servers.REBOOT_HARD,424    default=servers.REBOOT_SOFT,425    help='Perform a hard reboot (instead of a soft one).')426@utils.arg('server', metavar='<server>', help='Name or ID of server.')427def do_reboot(cs, args):428    """Reboot a server."""429    _find_server(cs, args.server).reboot(args.reboot_type)430@utils.arg('server', metavar='<server>', help='Name or ID of server.')431@utils.arg('image', metavar='<image>', help="Name or ID of new image.")432@utils.arg('--rebuild_password', dest='rebuild_password',433           metavar='<rebuild_password>', default=False,434           help="Set the provided password on the rebuild instance.")435def do_rebuild(cs, args):436    """Shutdown, re-image, and re-boot a server."""437    server = _find_server(cs, args.server)438    image = _find_image(cs, args.image)439    if args.rebuild_password is not False:440        _password = args.rebuild_password441    else:442        _password = None443    s = server.rebuild(image, _password)444    _print_server(cs, s)445@utils.arg('server', metavar='<server>',446           help='Name (old name) or ID of server.')447@utils.arg('name', metavar='<name>', help='New name for the server.')448def do_rename(cs, args):449    """Rename a server."""450    _find_server(cs, args.server).update(name=args.name)451@utils.arg('server', metavar='<server>', help='Name or ID of server.')452@utils.arg('flavor', metavar='<flavor>', help="Name or ID of new flavor.")453def do_resize(cs, args):454    """Resize a server."""455    server = _find_server(cs, args.server)456    flavor = _find_flavor(cs, args.flavor)457    server.resize(flavor)458@utils.arg('server', metavar='<server>', help='Name or ID of server.')459def do_resize_confirm(cs, args):460    """Confirm a previous resize."""461    _find_server(cs, args.server).confirm_resize()462@utils.arg('server', metavar='<server>', help='Name or ID of server.')463def do_resize_revert(cs, args):464    """Revert a previous resize (and return to the previous VM)."""465    _find_server(cs, args.server).revert_resize()466@utils.arg('server', metavar='<server>', help='Name or ID of server.')467def do_migrate(cs, args):468    """Migrate a server."""469    _find_server(cs, args.server).migrate()470@utils.arg('server', metavar='<server>', help='Name or ID of server.')471def do_pause(cs, args):472    """Pause a server."""473    _find_server(cs, args.server).pause()474@utils.arg('server', metavar='<server>', help='Name or ID of server.')475def do_unpause(cs, args):476    """Unpause a server."""477    _find_server(cs, args.server).unpause()478@utils.arg('server', metavar='<server>', help='Name or ID of server.')479def do_suspend(cs, args):480    """Suspend a server."""481    _find_server(cs, args.server).suspend()482@utils.arg('server', metavar='<server>', help='Name or ID of server.')483def do_resume(cs, args):484    """Resume a server."""485    _find_server(cs, args.server).resume()486@utils.arg('server', metavar='<server>', help='Name or ID of server.')487def do_rescue(cs, args):488    """Rescue a server."""489    _find_server(cs, args.server).rescue()490@utils.arg('server', metavar='<server>', help='Name or ID of server.')491def do_unrescue(cs, args):492    """Unrescue a server."""493    _find_server(cs, args.server).unrescue()494@utils.arg('server', metavar='<server>', help='Name or ID of server.')495def do_diagnostics(cs, args):496    """Retrieve server diagnostics."""497    utils.print_dict(cs.servers.diagnostics(args.server)[1])498@utils.arg('server', metavar='<server>', help='Name or ID of server.')499def do_actions(cs, args):500    """Retrieve server actions."""501    utils.print_list(502        cs.servers.actions(args.server),503        ["Created_At", "Action", "Error"])504@utils.arg('server', metavar='<server>', help='Name or ID of server.')505def do_root_password(cs, args):506    """507    Change the root password for a server.508    """509    server = _find_server(cs, args.server)510    p1 = getpass.getpass('New password: ')511    p2 = getpass.getpass('Again: ')512    if p1 != p2:513        raise exceptions.CommandError("Passwords do not match.")514    server.change_password(p1)515@utils.arg('server', metavar='<server>', help='Name or ID of server.')516@utils.arg('name', metavar='<name>', help='Name of snapshot.')517def do_image_create(cs, args):518    """Create a new image by taking a snapshot of a running server."""519    server = _find_server(cs, args.server)520    cs.servers.create_image(server, args.name)521@utils.arg('server',522     metavar='<server>',523     help="Name or ID of server")524@utils.arg('action',525     metavar='<action>',526     choices=['set', 'delete'],527     help="Actions: 'set' or 'delete'")528@utils.arg('metadata',529     metavar='<key=value>',530     nargs='+',531     action='append',532     default=[],533     help='Metadata to set or delete (only key is necessary on delete)')534def do_meta(cs, args):535    """Set or Delete metadata on a server."""536    server = _find_server(cs, args.server)537    metadata = {}538    for metadatum in args.metadata[0]:539        # Can only pass the key in on 'delete'540        # So this doesn't have to have '='541        if metadatum.find('=') > -1:542            (key, value) = metadatum.split('=', 1)543        else:544            key = metadatum545            value = None546        metadata[key] = value547    if args.action == 'set':548        cs.servers.set_meta(server, metadata)549    elif args.action == 'delete':550        cs.servers.delete_meta(server, metadata.keys())551def _print_server(cs, server):552    # By default when searching via name we will do a553    # findall(name=blah) and due a REST /details which is not the same554    # as a .get() and doesn't get the information about flavors and555    # images. This fix it as we redo the call with the id which does a556    # .get() to get all informations.557    if not 'flavor' in server._info:558        server = _find_server(cs, server.id)559    networks = server.networks560    info = server._info.copy()561    for network_label, address_list in networks.items():562        info['%s network' % network_label] = ', '.join(address_list)563    flavor = info.get('flavor', {})564    flavor_id = flavor.get('id', '')565    info['flavor'] = _find_flavor(cs, flavor_id).name566    image = info.get('image', {})567    image_id = image.get('id', '')568    info['image'] = _find_image(cs, image_id).name569    info.pop('links', None)570    info.pop('addresses', None)571    utils.print_dict(info)572@utils.arg('server', metavar='<server>', help='Name or ID of server.')573def do_show(cs, args):574    """Show details about the given server."""575    s = _find_server(cs, args.server)576    _print_server(cs, s)577@utils.arg('server', metavar='<server>', help='Name or ID of server.')578def do_delete(cs, args):579    """Immediately shut down and delete a server."""580    _find_server(cs, args.server).delete()581def _find_server(cs, server):582    """Get a server by name or ID."""583    return utils.find_resource(cs.servers, server)584def _find_image(cs, image):585    """Get an image by name or ID."""586    return utils.find_resource(cs.images, image)587def _find_flavor(cs, flavor):588    """Get a flavor by name, ID, or RAM size."""589    try:590        return utils.find_resource(cs.flavors, flavor)591    except exceptions.NotFound:592        return cs.flavors.find(ram=flavor)593# --zone_username is required since --username is already used.594@utils.arg('zone', metavar='<zone_id>', help='ID of the zone', default=None)595@utils.arg('--api_url', dest='api_url', default=None, help='New URL.')596@utils.arg('--zone_username', dest='zone_username', default=None,597                        help='New zone username.')598@utils.arg('--zone_password', dest='zone_password', default=None,599                        help='New password.')600@utils.arg('--weight_offset', dest='weight_offset', default=None,601                        help='Child Zone weight offset.')602@utils.arg('--weight_scale', dest='weight_scale', default=None,603                        help='Child Zone weight scale.')604def do_zone(cs, args):605    """Show or edit a child zone. No zone arg for this zone."""606    zone = cs.zones.get(args.zone)607    # If we have some flags, update the zone608    zone_delta = {}609    if args.api_url:610        zone_delta['api_url'] = args.api_url611    if args.zone_username:612        zone_delta['username'] = args.zone_username613    if args.zone_password:614        zone_delta['password'] = args.zone_password615    if args.weight_offset:616        zone_delta['weight_offset'] = args.weight_offset617    if args.weight_scale:618        zone_delta['weight_scale'] = args.weight_scale619    if zone_delta:620        zone.update(**zone_delta)621    else:622        utils.print_dict(zone._info)623def do_zone_info(cs, args):624    """Get this zones name and capabilities."""625    zone = cs.zones.info()626    utils.print_dict(zone._info)627@utils.arg('zone_name', metavar='<zone_name>',628            help='Name of the child zone being added.')629@utils.arg('api_url', metavar='<api_url>', help="URL for the Zone's Auth API")630@utils.arg('--zone_username', metavar='<zone_username>',631            help='Optional Authentication username. (Default=None)',632            default=None)633@utils.arg('--zone_password', metavar='<zone_password>',634           help='Authentication password. (Default=None)',635           default=None)636@utils.arg('--weight_offset', metavar='<weight_offset>',637           help='Child Zone weight offset (Default=0.0))',638           default=0.0)639@utils.arg('--weight_scale', metavar='<weight_scale>',640           help='Child Zone weight scale (Default=1.0).',641           default=1.0)642def do_zone_add(cs, args):643    """Add a new child zone."""644    zone = cs.zones.create(args.zone_name, args.api_url,645                           args.zone_username, args.zone_password,646                           args.weight_offset, args.weight_scale)647    utils.print_dict(zone._info)648@utils.arg('zone', metavar='<zone>', help='Name or ID of the zone')649def do_zone_delete(cs, args):650    """Delete a zone."""651    cs.zones.delete(args.zone)652def do_zone_list(cs, args):653    """List the children of a zone."""654    utils.print_list(cs.zones.list(), ['ID', 'Name', 'Is Active', \655                        'API URL', 'Weight Offset', 'Weight Scale'])656@utils.arg('server', metavar='<server>', help='Name or ID of server.')657@utils.arg('network_id', metavar='<network_id>', help='Network ID.')658def do_add_fixed_ip(cs, args):659    """Add new IP address to network."""660    server = _find_server(cs, args.server)661    server.add_fixed_ip(args.network_id)662@utils.arg('server', metavar='<server>', help='Name or ID of server.')663@utils.arg('address', metavar='<address>', help='IP Address.')664def do_remove_fixed_ip(cs, args):665    """Remove an IP address from a server."""666    server = _find_server(cs, args.server)667    server.remove_fixed_ip(args.address)668def _find_volume(cs, volume):669    """Get a volume by ID."""670    return utils.find_resource(cs.volumes, volume)671def _find_volume_snapshot(cs, snapshot):672    """Get a volume snapshot by ID."""673    return utils.find_resource(cs.volume_snapshots, snapshot)674def _print_volume(cs, volume):675    utils.print_dict(volume._info)676def _print_volume_snapshot(cs, snapshot):677    utils.print_dict(snapshot._info)678def _translate_volume_keys(collection):679    convert = [('displayName', 'display_name')]680    for item in collection:681        keys = item.__dict__.keys()682        for from_key, to_key in convert:683            if from_key in keys and to_key not in keys:684                setattr(item, to_key, item._info[from_key])685def _translate_volume_snapshot_keys(collection):686    convert = [('displayName', 'display_name'), ('volumeId', 'volume_id')]687    for item in collection:688        keys = item.__dict__.keys()689        for from_key, to_key in convert:690            if from_key in keys and to_key not in keys:691                setattr(item, to_key, item._info[from_key])692def do_volume_list(cs, args):693    """List all the volumes."""694    volumes = cs.volumes.list()695    _translate_volume_keys(volumes)696    # Create a list of servers to which the volume is attached697    for vol in volumes:698        servers = [server.get('serverId') for server in vol.attachments]699        setattr(vol, 'attached_to', ','.join(map(str, servers)))700    utils.print_list(volumes, ['ID', 'Status', 'Display Name',701                        'Size', 'Attached to'])702@utils.arg('volume', metavar='<volume>', help='ID of the volume.')703def do_volume_show(cs, args):704    """Show details about a volume."""705    volume = _find_volume(cs, args.volume)706    _print_volume(cs, volume)707@utils.arg('size',708    metavar='<size>',709    type=int,710    help='Size of volume in GB')711@utils.arg('--snapshot_id',712    metavar='<snapshot_id>',713    help='Optional snapshot id to create the volume from. (Default=None)',714    default=None)715@utils.arg('--display_name', metavar='<display_name>',716            help='Optional volume name. (Default=None)',717            default=None)718@utils.arg('--display_description', metavar='<display_description>',719            help='Optional volume description. (Default=None)',720            default=None)721def do_volume_create(cs, args):722    """Add a new volume."""723    cs.volumes.create(args.size,724                        args.snapshot_id,725                        args.display_name,726                        args.display_description)727@utils.arg('volume', metavar='<volume>', help='ID of the volume to delete.')728def do_volume_delete(cs, args):729    """Remove a volume."""730    volume = _find_volume(cs, args.volume)731    volume.delete()732@utils.arg('server',733    metavar='<server>',734    help='Name or ID of server.')735@utils.arg('volume',736    metavar='<volume>',737    type=int,738    help='ID of the volume to attach.')739@utils.arg('device', metavar='<device>',740    help='Name of the device e.g. /dev/vdb.')741def do_volume_attach(cs, args):742    """Attach a volume to a server."""743    cs.volumes.create_server_volume(_find_server(cs, args.server).id,744                                        args.volume,745                                        args.device)746@utils.arg('server',747    metavar='<server>',748    help='Name or ID of server.')749@utils.arg('attachment_id',750    metavar='<volume>',751    type=int,752    help='Attachment ID of the volume.')753def do_volume_detach(cs, args):754    """Detach a volume from a server."""755    cs.volumes.delete_server_volume(_find_server(cs, args.server).id,756                                        args.attachment_id)757def do_volume_snapshot_list(cs, args):758    """List all the snapshots."""759    snapshots = cs.volume_snapshots.list()760    _translate_volume_snapshot_keys(snapshots)761    utils.print_list(snapshots, ['ID', 'Volume ID', 'Status', 'Display Name',762                        'Size'])763@utils.arg('snapshot', metavar='<snapshot>', help='ID of the snapshot.')764def do_volume_snapshot_show(cs, args):765    """Show details about a snapshot."""766    snapshot = _find_volume_snapshot(cs, args.snapshot)767    _print_volume_snapshot(cs, snapshot)768@utils.arg('volume_id',769    metavar='<volume_id>',770    type=int,771    help='ID of the volume to snapshot')772@utils.arg('--force',773    metavar='<True|False>',774    help='Optional flag to indicate whether to snapshot a volume even if its '775        'attached to an instance. (Default=False)',776    default=False)777@utils.arg('--display_name', metavar='<display_name>',778            help='Optional snapshot name. (Default=None)',779            default=None)780@utils.arg('--display_description', metavar='<display_description>',781            help='Optional snapshot description. (Default=None)',782            default=None)783def do_volume_snapshot_create(cs, args):784    """Add a new snapshot."""785    cs.volume_snapshots.create(args.volume_id,786                        args.force,787                        args.display_name,788                        args.display_description)789@utils.arg('snapshot_id',790    metavar='<snapshot_id>',791    help='ID of the snapshot to delete.')792def do_volume_snapshot_delete(cs, args):793    """Remove a snapshot."""794    snapshot = _find_volume_snapshot(cs, args.snapshot_id)795    snapshot.delete()796def _print_floating_ip_list(floating_ips):797    utils.print_list(floating_ips, ['Ip', 'Instance Id', 'Fixed Ip'])798@utils.arg('server', metavar='<server>', help='Name or ID of server.')799@utils.arg('address', metavar='<address>', help='IP Address.')800def do_add_floating_ip(cs, args):801    """Add a floating IP address to a server."""802    server = _find_server(cs, args.server)803    server.add_floating_ip(args.address)804@utils.arg('server', metavar='<server>', help='Name or ID of server.')805@utils.arg('address', metavar='<address>', help='IP Address.')806def do_remove_floating_ip(cs, args):807    """Remove a floating IP address from a server."""808    server = _find_server(cs, args.server)809    server.remove_floating_ip(args.address)810def do_floating_ip_create(cs, args):811    """Allocate a floating IP for the current tenant."""812    _print_floating_ip_list([cs.floating_ips.create()])813@utils.arg('address', metavar='<address>', help='IP of Floating Ip.')814def do_floating_ip_delete(cs, args):815    """De-allocate a floating IP."""816    floating_ips = cs.floating_ips.list()817    for floating_ip in floating_ips:818        if floating_ip.ip == args.address:819            return cs.floating_ips.delete(floating_ip.id)820    raise exceptions.CommandError("Floating ip %s not found.", args.address)821def do_floating_ip_list(cs, args):822    """List floating ips for this tenant."""823    _print_floating_ip_list(cs.floating_ips.list())824def _print_dns_list(dns_entries):825    utils.print_list(dns_entries, ['ip', 'zone', 'name'])826def do_dns_zones(cs, args):827    """Print a list of available dns zones."""828    zones = cs.floating_ip_dns.zones()829    utils.print_list(zones, ['zone'])830@utils.arg('zone', metavar='<zone>', help='DNS zone')831@utils.arg('--ip', metavar='<ip>', help='ip address', default=None)832@utils.arg('--name', metavar='<name>', help='DNS name', default=None)833def do_dns_list(cs, args):834    """List current DNS entries for zone and ip or zone and name."""835    if not (args.ip or args.name):836        raise exceptions.CommandError(837              "You must specify either --ip or --name")838    entries = cs.floating_ip_dns.get_entries(args.zone,839                                             ip=args.ip, name=args.name)840    _print_dns_list(entries)841@utils.arg('zone', metavar='<zone>', help='DNS zone')842@utils.arg('name', metavar='<name>', help='DNS name')843@utils.arg('ip', metavar='<ip>', help='ip address')844@utils.arg('--type', metavar='<type>', help='dns type (e.g. "A")',845           default='A')846def do_dns_create(cs, args):847    """Create a DNS entry for zone, name and ip."""848    entries = cs.floating_ip_dns.create_entry(args.zone, args.name,849                                              args.ip, args.type)850    _print_dns_list([entries])851@utils.arg('zone', metavar='<zone>', help='DNS zone')852@utils.arg('name', metavar='<name>', help='DNS name')853def do_dns_delete(cs, args):854    """Delete the specified DNS entry."""855    cs.floating_ip_dns.delete_entry(args.zone, args.name)856def _print_secgroup_rules(rules):857    class FormattedRule:858        def __init__(self, obj):859            items = (obj if isinstance(obj, dict) else obj._info).items()860            for k, v in items:861                if k == 'ip_range':862                    v = v.get('cidr')863                elif k == 'group':864                    k = 'source_group'865                    v = v.get('name')866                if v is None:867                    v = ''868                setattr(self, k, v)869    rules = [FormattedRule(rule) for rule in rules]870    utils.print_list(rules, ['IP Protocol', 'From Port', 'To Port',871                             'IP Range', 'Source Group'])872def _print_secgroups(secgroups):873    utils.print_list(secgroups, ['Name', 'Description'])874def _get_secgroup(cs, secgroup):875    for s in cs.security_groups.list():876        if secgroup == s.name:877            return s878    raise exceptions.CommandError("Secgroup %s not found" % secgroup)879@utils.arg('secgroup', metavar='<secgroup>', help='ID of security group.')880@utils.arg('ip_proto', metavar='<ip_proto>', help='ip_proto (icmp, tcp, udp).')881@utils.arg('from_port', metavar='<from_port>', help='Port at start of range.')882@utils.arg('to_port', metavar='<to_port>', help='Port at end of range.')883@utils.arg('cidr', metavar='<cidr>', help='CIDR for address range.')884def do_secgroup_add_rule(cs, args):885    """Add a rule to a security group."""886    secgroup = _get_secgroup(cs, args.secgroup)887    rule = cs.security_group_rules.create(secgroup.id,888                                          args.ip_proto,889                                          args.from_port,890                                          args.to_port,891                                          args.cidr)892    _print_secgroup_rules([rule])893@utils.arg('secgroup', metavar='<secgroup>', help='ID of security group.')894@utils.arg('ip_proto', metavar='<ip_proto>', help='ip_proto (icmp, tcp, udp).')895@utils.arg('from_port', metavar='<from_port>', help='Port at start of range.')896@utils.arg('to_port', metavar='<to_port>', help='Port at end of range.')897@utils.arg('cidr', metavar='<cidr>', help='CIDR for address range.')898def do_secgroup_delete_rule(cs, args):899    """Delete a rule from a security group."""900    secgroup = _get_secgroup(cs, args.secgroup)901    for rule in secgroup.rules:902        if (rule['ip_protocol'] == args.ip_proto and903            rule['from_port'] == int(args.from_port) and904            rule['to_port'] == int(args.to_port) and905            rule['ip_range']['cidr'] == args.cidr):906            return cs.security_group_rules.delete(rule['id'])907    raise exceptions.CommandError("Rule not found")908@utils.arg('name', metavar='<name>', help='Name of security group.')909@utils.arg('description', metavar='<description>',910           help='Description of security group.')911def do_secgroup_create(cs, args):912    """Create a security group."""913    _print_secgroups([cs.security_groups.create(args.name, args.description)])914@utils.arg('secgroup', metavar='<secgroup>', help='Name of security group.')915def do_secgroup_delete(cs, args):916    """Delete a security group."""917    cs.security_groups.delete(_get_secgroup(cs, args.secgroup))918def do_secgroup_list(cs, args):919    """List security groups for the curent tenant."""920    _print_secgroups(cs.security_groups.list())921@utils.arg('secgroup', metavar='<secgroup>', help='Name of security group.')922def do_secgroup_list_rules(cs, args):923    """List rules for a security group."""924    secgroup = _get_secgroup(cs, args.secgroup)925    _print_secgroup_rules(secgroup.rules)926@utils.arg('secgroup', metavar='<secgroup>', help='ID of security group.')927@utils.arg('source_group', metavar='<source_group>',928           help='ID of source group.')929@utils.arg('--ip_proto', metavar='<ip_proto>',930           help='ip_proto (icmp, tcp, udp).')931@utils.arg('--from_port', metavar='<from_port>',932           help='Port at start of range.')933@utils.arg('--to_port', metavar='<to_port>', help='Port at end of range.')934def do_secgroup_add_group_rule(cs, args):935    """Add a source group rule to a security group."""936    secgroup = _get_secgroup(cs, args.secgroup)937    source_group = _get_secgroup(cs, args.source_group)938    params = {}939    params['group_id'] = source_group.id940    if args.ip_proto or args.from_port or args.to_port:941        if not (args.ip_proto and args.from_port and args.to_port):942            raise exceptions.CommandError("ip_proto, from_port, and to_port"943                                           " must be specified together")944        params['ip_protocol'] = args.ip_proto945        params['from_port'] = args.from_port946        params['to_port'] = args.to_port947    rule = cs.security_group_rules.create(secgroup.id, **params)948    _print_secgroup_rules([rule])949@utils.arg('secgroup', metavar='<secgroup>', help='ID of security group.')950@utils.arg('source_group', metavar='<source_group>',951           help='ID of source group.')952@utils.arg('--ip_proto', metavar='<ip_proto>',953           help='ip_proto (icmp, tcp, udp).')954@utils.arg('--from_port', metavar='<from_port>',955           help='Port at start of range.')956@utils.arg('--to_port', metavar='<to_port>', help='Port at end of range.')957def do_secgroup_delete_group_rule(cs, args):958    """Delete a source group rule from a security group."""959    secgroup = _get_secgroup(cs, args.secgroup)960    source_group = _get_secgroup(cs, args.source_group)961    params = {}962    params['group_name'] = source_group.name963    if args.ip_proto or args.from_port or args.to_port:964        if not (args.ip_proto and args.from_port and args.to_port):965            raise exceptions.CommandError("ip_proto, from_port, and to_port"966                                           " must be specified together")967        params['ip_protocol'] = args.ip_proto968        params['from_port'] = int(args.from_port)969        params['to_port'] = int(args.to_port)970    for rule in secgroup.rules:971        if (rule.get('ip_protocol') == params.get('ip_protocol') and972            rule.get('from_port') == params.get('from_port') and973            rule.get('to_port') == params.get('to_port') and974            rule.get('group', {}).get('name') ==\975                     params.get('group_name')):976            return cs.security_group_rules.delete(rule['id'])977    raise exceptions.CommandError("Rule not found")978@utils.arg('name', metavar='<name>', help='Name of key.')979@utils.arg('--pub_key', metavar='<pub_key>', help='Path to a public ssh key.',980           default=None)981def do_keypair_add(cs, args):982    """Create a new key pair for use with instances"""983    name = args.name984    pub_key = args.pub_key985    if pub_key:986        try:987            with open(pub_key) as f:988                pub_key = f.read()989        except IOError, e:990            raise exceptions.CommandError("Can't open or read '%s': %s" % \991                                                          (pub_key, e))992    keypair = cs.keypairs.create(name, pub_key)993    if not pub_key:994        private_key = keypair.private_key995        print private_key996@utils.arg('name', metavar='<name>', help='Keypair name to delete.')997def do_keypair_delete(cs, args):998    """Delete keypair by its id"""999    name = args.name1000    cs.keypairs.delete(name)1001def do_keypair_list(cs, args):1002    """Print a list of keypairs for a user"""1003    keypairs = cs.keypairs.list()1004    columns = ['Name', 'Fingerprint']1005    utils.print_list(keypairs, columns)1006def do_absolute_limits(cs, args):1007    """Print a list of absolute limits for a user"""1008    limits = cs.limits.get().absolute1009    columns = ['Name', 'Value']1010    utils.print_list(limits, columns)1011def do_rate_limits(cs, args):1012    """Print a list of rate limits for a user"""1013    limits = cs.limits.get().rate1014    columns = ['Verb', 'URI', 'Value', 'Remain', 'Unit', 'Next_Available']...trape.py
Source:trape.py  
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3#**4#5#########6# trape #7#########8#9# trape depends of this file10# For full copyright information this visit: https://github.com/jofpin/trape11#12# Copyright 2018 by Jose Pino (@jofpin) / <jofpin@gmail.com>13#**14import time15import json16import urllib17from core.dependence import urllib218import http.client 19import argparse20import socket21import sys22import os23from core.utils import utils24import subprocess25import requests26import hashlib, binascii27from threading import Timer28from multiprocessing import Process29import atexit30class Trape(object):31	def __init__(self, stat = 0):32		self.name_trape = "Trape"33		self.version = "2.1"34		self.stats_path = "ngrok"35		self.home_path = utils.generateToken(18)36		self.logout_path = utils.generateToken(6)37		self.remove_path = utils.generateToken(14)38		self.injectURL = utils.generateToken(12) + '.js'39		self.stats_key = utils.generateToken(24)40		self.date_start = time.strftime("%Y-%m-%d - %H:%M:%S")41		self.stat = stat42		self.localIp = '127.0.0.1'43		self.nGrokUrl = ''44		self.JSFiles = ({"path" : "base.js", "src" : utils.generateToken(12)},{"path" : "libs.min.js", "src" : utils.generateToken(12)},{"path" : "login.js", "src" : utils.generateToken(12)},{"path" : "payload.js", "src" : utils.generateToken(12)},{"path" : "trape.js", "src" : utils.generateToken(12)},{"path" : "vscript.js", "src" : utils.generateToken(12)},{"path" : "custom.js", "src" : utils.generateToken(12)},)45		self.CSSFiles = ({"path" : "/static/img/favicon.ico", "src" : utils.generateToken(12)},{"path" : "/static/img/favicon.png", "src" : utils.generateToken(12)},{"path" : "/static/css/base-icons.css", "src" : utils.generateToken(12)},{"path" : "/static/css/styles.css", "src" : utils.generateToken(12)},{"path" : "/static/css/normalize.min.css", "src" : utils.generateToken(12)},{"path": "/static/css/services-icons.css", "src" : utils.generateToken(12)},)46		if self.stat == 1:47			c = http.client.HTTPConnection('www.google.com', timeout=5)48			try:49				c.request("HEAD", "/")50				c.close()51			except Exception as e:52				c.close()53				utils.Go("\033[H\033[J")54				utils.Go(utils.Color['whiteBold'] + "[" + utils.Color['redBold'] + "x" + utils.Color['whiteBold'] + "]" + utils.Color['redBold'] + " " + "NOTICE: " + utils.Color['white'] + "Trape needs Internet connection for working" + "\n\t")55				sys.exit(0)56			if (not(os.path.exists("trape.config"))):57			 	self.trape_config()58			try:59				config_trape = json.load(open("trape.config"))60			except Exception as error:61				os.remove('trape.config')62				self.trape_config()63			self.ngrok = config_trape['ngrok_token']64			self.gmaps = config_trape['gmaps_api_key']65			self.ipinfo = config_trape['ipinfo_api_key']66			if self.gmaps == '':67				self.gmaps = 'AIzaSyA30wEa2DwUuddmNTHvoprhnrB2w_aCWbs'68			self.googl = config_trape['gshortener_api_key']69			if self.googl == '':70				self.googl = 'AIzaSyDHMDTOGo9L1OBl5vRxOVM6vpXOXVp5jCc'71			72			parser = argparse.ArgumentParser("python3 trape.py -u <<Url>> -p <<Port>>")73			parser.add_argument('-u', '--url', dest='url', help='Put the web page url to clone')74			parser.add_argument('-p', '--port', dest='port', help='Insert your port')75			parser.add_argument('-ak', '--accesskey', dest='accesskey', help='Insert your custom key access')76			parser.add_argument('-l', '--local', dest='local', help='Insert your home file')77			parser.add_argument('-n', '--ngrok', dest='ngrok', help='Insert your ngrok Authtoken', action='store_true')78			parser.add_argument('-ic', '--injectcode', dest='injc', help='Insert your custom REST API path')79			parser.add_argument('-ud', '--update', dest='update', action='store_true', default=False, help='Update trape to the latest version')80			options = parser.parse_args()81			self.type_lure = 'global'82			# Check current updates83			if options.update:84				utils.Go("\033[H\033[J")85				utils.Go("Updating..." + " " + utils.Color['blue'] + "trape" + utils.Color['white'] + "..." + "\n")86				subprocess.check_output(["git", "reset", "--hard", "origin/master"])87				subprocess.check_output(["git", "pull"])88				utils.Go("Trape Updated... Please execute again...")89				sys.exit(0)90			if options.url is None:91				utils.Go("\033[H\033[J")92				utils.Go("----------------------------------------------")93				utils.Go("" + " " + utils.Color['redBold'] + "TRAPE" + utils.Color['white'] +" {" + utils.Color['yellowBold'] + "stable" + utils.Color['white'] + "}" + utils.Color['white'] + " - " + "Osint and analytics tool" + " " + "<" +utils.Color['white'])94				utils.Go("----------------------------------------------")95				utils.Go("| v" + utils.Color['redBold'] + self.version + utils.Color['white'] + " |")    96				utils.Go("--------" + "\n")97				utils.Go(utils.Color['whiteBold'] + "[" + utils.Color['greenBold'] + "!" + utils.Color['whiteBold'] + "]" + " " + utils.Color['white'] + "Enter the information requested below to complete the execution" + utils.Color['white'])98				utils.Go("")99				options.url = input(utils.Color['blueBold'] + "-" + utils.Color['white'] + " Enter a URL to generate the lure" + " " + utils.Color['yellow'] + ":~> " + utils.Color['white'])100			if options.port is None:101				options.port = input(utils.Color['blueBold'] + "-" + utils.Color['white'] + " What is your port to generate the server?" + " " + utils.Color['yellow'] + ":~> " + utils.Color['white'])102			while utils.checkPort(int(options.port)) == False:103				utils.Go("\033[H\033[J")104				utils.Go("----------------------------------------------")105				utils.Go("" + " " + utils.Color['redBold'] + "TRAPE" + utils.Color['white'] +" {" + utils.Color['yellowBold'] + "stable" + utils.Color['white'] + "}" + utils.Color['white'] + " - " + "Osint and analytics tool" + " " + "<" +utils.Color['white'])106				utils.Go("----------------------------------------------")107				utils.Go("\n")108				utils.Go(utils.Color['whiteBold'] + "[" + utils.Color['redBold'] + "x" + utils.Color['whiteBold'] + "]" + utils.Color['redBold'] + " " + "ERROR:" + " " + utils.Color['whiteBold'] + "The port: " + options.port + utils.Color['white'] + " " + "is not available, It was previously used (" + utils.Color['yellow'] + "Use another port" + utils.Text['end'] + ")" + "\n\n")109				options.port = input(utils.Color['blueBold'] + "-" + utils.Color['white'] + " What is your port to generate the server?" + " " + utils.Color['yellow'] + ":~> " + utils.Color['white'])110			#while utils.checkUrl(str(options.url)) == False:111				options.url = input(utils.Color['blueBold'] + "-" + utils.Color['white'] + " Enter a URL to generate the lure" + " " + utils.Color['yellow'] + ":~> " + utils.Color['white'])112			utils.Go("")113			utils.Go(utils.Color['greenBold'] + "-" + utils.Color['white'] + " Successful " + utils.Color['greenBold'] + "startup" + utils.Color['white'] + ", get lucky on the way!" + utils.Color['white'])114			utils.Go("")115			time.sleep(0.1)116			s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)117			s.connect(("8.8.8.8", 80))118			self.localIp = s.getsockname()[0]119			self.app_port = int(options.port)120			self.url_to_clone = str(options.url)121			if self.url_to_clone[0:4] != 'http':122				self.url_to_clone = 'http://' + self.url_to_clone123			self.victim_path = options.url.replace("http://", "").replace("https://", "")124			if (options.ngrok or (self.ngrok != "")):125				if self.ngrok == '':126					utils.Go("\033[H\033[J")127					self.ngrok = input("What is your nGrok token?" + " " + utils.Color['yellow'] + ":~> " + utils.Color['white'])128				if (self.ngrok != ''):129					from core.ngrok import ngrok130					import os.path as path131					v_ngrok = ngrok(self.ngrok, self.app_port, stat, self.stats_path)132				else:133					utils.Go(utils.Color['whiteBold'] + "[" + utils.Color['redBold'] + "x" + utils.Color['whiteBold'] + "]" + utils.Color['redBold'] + " " + "ERROR: " + " " + utils.Color['white'] + "Your nGrok authtoken can't be empty")134			135			# Custom name of REST API136			if (options.injc):137				self.injectURL = options.injc138			# Custom access token	139			if (options.accesskey):140			    self.stats_key = options.accesskey141	# Design principal of the header of trape142	def header(self):143		if self.stat == 1:144			# Principal header of tool145			utils.banner()146			# Update verification147			changeLog = requests.get("https://raw.githubusercontent.com/jofpin/trape/master/version.txt", timeout = 4)148			changeLog = changeLog.text.split(" ")[1]149			changeLog = changeLog.strip()150			if changeLog != self.version:151				utils.Go(utils.Color['white'] + "\t" + utils.Color['yellowBold'] + "@" + utils.Color['white'] + "-" + utils.Color['blue'] + "=" + utils.Color['white'] + "["  + utils.Color['whiteBold'] + " " + "UPDATES:" + " " + utils.Color['yellowBold'] + "NEW VERSION IS AVAILABLE: " + utils.Color['white'] + "v" + utils.Color['redBold'] + changeLog + utils.Color['white'] + " " + "(install changes)")152				utils.Go("")153			else:154				utils.Go(utils.Color['white'] + "\t" + utils.Color['yellowBold'] + "@" + utils.Color['white'] + "-" + utils.Color['blue'] + "=" + utils.Color['white'] + "["  + utils.Color['whiteBold'] + " " + "UPDATES:" + " " + utils.Color['greenBold'] + "RUNNING RECENT VERSION" + utils.Color['white'])155				utils.Go("")156			# Local information vars	157			utils.Go(utils.Color['white'] + "\t" + utils.Color['whiteBold'] + "LOCAL INFORMATION" + utils.Text['end'])158			utils.Go("\t" + "-------------------")159			utils.Go(utils.Color['white'] + "\t" + utils.Color['green'] + ">" + utils.Color['white'] + "-" + utils.Color['blue'] + "=" + utils.Color['white'] + "["  + utils.Color['white'] + " Lure for the users: " + utils.Color['blue'] + 'http://' + self.localIp + ':' + str(self.app_port) + '/' + self.victim_path)160			utils.Go(utils.Color['white'] + "\t" + utils.Color['green'] + ">" + utils.Color['white'] + "-" + utils.Color['blue'] + "=" + utils.Color['white'] + "["  + utils.Color['white'] + " Your REST API path: " + utils.Color['blue'] + 'http://' + self.localIp + ':' + str(self.app_port) + '/' + self.injectURL + utils.Color['white'])161			utils.Go(utils.Color['white'] + "\t" + utils.Color['green'] + ">" + utils.Color['white'] + "-" + utils.Color['blue'] + "=" + utils.Color['white'] + "["  + utils.Color['white'] + " Control Panel Link: " + utils.Color['blue'] + "http://127.0.0.1:" + utils.Color['blue'] + str(self.app_port) + '/' + self.stats_path)162			utils.Go(utils.Color['white'] + "\t" + utils.Color['green'] + ">" + utils.Color['white'] + "-" + utils.Color['blue'] + "=" + utils.Color['white'] + "["  + utils.Color['white'] + " Your Access key: " + utils.Color['blue'] + self.stats_key + utils.Color['white'])163			utils.Go("")164			if self.ngrok != '':165				if self.googl == '':166					self.googl = 'AIzaSyCPzcppCT27KTHnxAIQvYhtvB_l8sKGYBs'167				try:168					opener = urllib.request.build_opener()169					pLog = 4040170					ngrokStatus = str(opener.open('http://127.0.0.1:' + str(pLog) + '/api/tunnels').read()).replace('\n', '').replace(' ', '')171					time.sleep(0.5)172					ngrokUrlPos = ngrokStatus.find('ngrok.io')173					if ngrokUrlPos <= 0:174						time.sleep(4)175						ngrokStatus = str(opener.open('http://127.0.0.1:' + str(pLog) + '/api/tunnels').read()).replace('\n', '').replace(' ', '')176						ngrokUrlPos = ngrokStatus.find('ngrok.io')177					if ngrokUrlPos >= 0:178						ngrokStatus = ngrokStatus[ngrokUrlPos-25:ngrokUrlPos+28]179						ngrokUrlPos = ngrokStatus.find('http')180						ngrokUrlPos2 = ngrokStatus.find('.io')181						ngrokStatus = ngrokStatus[ngrokUrlPos: ngrokUrlPos2] + '.io'182						utils.Go(utils.Color['white'] + "\t" + utils.Color['whiteBold'] + "PUBLIC INFORMATION" + utils.Text['end'])183						utils.Go("\t" + "-------------------")184						r = utils.gShortener(self.googl, ngrokStatus.replace('https', 'http') + '/' + self.victim_path)185						self.nGrokUrl = ngrokStatus.replace('https', 'http')186						utils.Go(utils.Color['white'] + "\t" + utils.Color['yellow'] + ">" + utils.Color['white'] + "-" + utils.Color['blue'] + "=" + utils.Color['white'] + "["  + utils.Color['white'] + " Public lure: " + utils.Color['blue'] + self.nGrokUrl + '/' + self.victim_path + utils.Color['white'])187						utils.Go(utils.Color['white'] + "\t" + utils.Color['yellow'] + ">" + utils.Color['white'] + "-" + utils.Color['blue'] + "=" + utils.Color['white'] + "["  + utils.Color['white'] + " Control Panel link: " + utils.Color['blue'] + ngrokStatus.replace('https', 'http') + '/' + self.stats_path + utils.Color['white'])188					else:189						utils.Go(utils.Color['red'] + "\t" + utils.Color['green'] + "-" + utils.Color['white'] + "--" + utils.Color['red'] + "=" + utils.Color['white'] + "["  + utils.Color['white'] + " We can't connect with nGrok " + utils.Color['white'])190				except Exception as e:191					utils.Go(utils.Color['white'] + "[" + utils.Color['redBold'] + "x" + utils.Color['whiteBold'] + "]" + utils.Color['redBold'] + " " + "ERROR: " + " " + utils.Color['white'] + e.message)192					utils.Go(utils.Color['red'] + "\t" + utils.Color['green'] + "-" + utils.Color['white'] + "--" + utils.Color['red'] + "=" + utils.Color['white'] + "["  + utils.Color['white'] + " We can't connect with nGrok " + utils.Color['white'])193			utils.Go("\n" + utils.Color['white'])194			utils.Go(utils.Color['white'] + "[" + utils.Color['greenBold'] + ">" + utils.Color['white'] + "]" + utils.Color['whiteBold'] + " " + "Start time:" + " " + utils.Color['white'] + self.date_start)195			utils.Go(utils.Color['white'] + "[" + utils.Color['greenBold'] + "?" + utils.Color['white'] + "]" + utils.Color['white'] + " " + "Do not forget to close " + self.name_trape + ", after use. Press Control C" + " " + utils.Color['white'] + '\n')196			utils.Go(utils.Color['white'] + "[" + utils.Color['greenBold'] + "¡" + utils.Color['white'] + "]" + utils.Color['white'] + " " + "Waiting for the users to fall..." + "\n")197	# Important: in the process of use is possible that will ask for the root198	def rootConnection(self):199		pass200	# Detect operating system, to compose the compatibility			201	def loadCheck(self):202		utils.checkOS()203		204    # the main file (trape.py)205	def main(self):206		import core.sockets207	# Create config file208	def trape_config(self):209		utils.Go("\033[H\033[J")210		utils.Go("----------------------------------------------------------")211		utils.Go("" + " " + utils.Color['redBold'] + "TRAPE" + utils.Color['white'] +" {" + utils.Color['yellowBold'] + "stable" + utils.Color['white'] + "}" + utils.Color['white'] + " - " + "Configuration zone to use the software" + " " + "<" + utils.Color['white'])212		utils.Go("----------------------------------------------------------")213		utils.Go("| v" + utils.Color['redBold'] + self.version + utils.Color['white'] + " |")    214		utils.Go("--------" + "\n")215		utils.Go(utils.Color['whiteBold'] + "GENERAL CONFIG" + utils.Color['white'])216		utils.Go("------")217		utils.Go("Through this section you will configure the resources required \nfor an effective function of trape, please complete the following steps, below. \nKeep in mind that if the data is incorrect this tool will not work." + utils.Color['white'])218		utils.Go("")219		utils.Go(utils.Color['whiteBold'] + "NGROK TOKEN" + utils.Color['white'])220		utils.Go("------")221		utils.Go("In the next section you must enter your Ngrok token, if you do not have \none register at (" + utils.Color['blueBold'] + "https://ngrok.com" + utils.Color['white'] + "), this data is necessary for the generation of public network tunnels.")222		utils.Go("")223		c_nGrokToken = input(utils.Color['blueBold'] + "-" + utils.Color['white'] + " Enter your ngrok token" + " " + utils.Color['yellow'] + ":~> " + utils.Color['white'])224		utils.Go("")225		utils.Go(utils.Color['whiteBold'] + "GOOGLE API" + utils.Color['white'])226		utils.Go("------")227		utils.Go("You must register with the " + utils.Color['blueBold'] + "Google Console" + utils.Color['white'] + ", and get an API for maps and another for shortening. \nBy having these data you complete the settings")228		utils.Go("")229		c_gMapsToken = input(utils.Color['blueBold'] + "-" + utils.Color['white'] + " What is your Google Maps Api Key?" + " " + utils.Color['yellow'] + ":~> " + utils.Color['white'])230		c_gOoglToken = input(utils.Color['blueBold'] + "-" + utils.Color['white'] + " Enter your Goo.gl (shortener) Api Key (leave it empty if you don't have)" + " " + utils.Color['yellow'] + ":~> " + utils.Color['white'])231		utils.Go("")232		utils.Go(utils.Color['whiteBold'] + "IP INFO API" + utils.Color['white'])233		utils.Go("------")234		utils.Go("You must register with the " + utils.Color['blueBold'] + "https://ipgeolocation.io" + utils.Color['white'] + ", and get an API for geolocation. \nBy having these data you complete the settings")235		utils.Go("")236		c_ipinfo = input(utils.Color['blueBold'] + "-" + utils.Color['white'] + " What is your IP Info Api Key?" + " " + utils.Color['yellow'] + ":~> " + utils.Color['white'])237		utils.Go("")238		utils.Go(utils.Color['greenBold'] + "-" + utils.Color['white'] + " Congratulations! " + utils.Color['greenBold'] + "Successful configuration" + utils.Color['white'] + ", now enjoy Trape!" + utils.Color['white'])239		utils.Go("")240		time.sleep(0.4)241		if (c_nGrokToken != '' and c_gMapsToken != ''):242			v = '{\n\t"ngrok_token" : "' + c_nGrokToken + '",\n\t"gmaps_api_key" : "' + c_gMapsToken + '",\n\t"gshortener_api_key" : "' + c_gOoglToken + '",\n\t"ipinfo_api_key" : "' + c_ipinfo + '"\n}'243			f = open ('trape.config', 'w')244			f.write(v)245			f.close()246		else:247			self.trape_config()248	def injectCSS_Paths(self, code):249		code = code.replace("[FAVICON_HREF]", self.CSSFiles[0]['src'])250		code = code.replace("[FAVICON_PNG_HREF]",self.CSSFiles[1]['src'])251		code = code.replace("[BASE_ICONS_HREF]", self.CSSFiles[2]['src'])252		code = code.replace("[STYLES_HREF]", self.CSSFiles[3]['src'])253		code = code.replace("[NORMALIZE_HREF]", self.CSSFiles[4]['src'])254		code = code.replace("[SERVICES_ICONS_HREF]", self.CSSFiles[5]['src'])255		return code256# Autocompletion of console257if "nt" in os.name:258	pass259else:260	import readline261	readline.parse_and_bind("tab:complete")...test_utils.py
Source:test_utils.py  
1from astropy.tests.helper import pytest2import numpy as np3import stingray.utils as utils4from scipy.stats import sem5np.random.seed(20150907)6class TestRebinData(object):7    @classmethod8    def setup_class(cls):9        cls.dx = 1.010        cls.n = 1011        cls.counts = 2.012        cls.x = np.arange(cls.dx / 2, cls.dx / 2 + cls.n * cls.dx, cls.dx)13        cls.y = np.zeros_like(cls.x) + cls.counts14        cls.yerr = np.sqrt(cls.y)15    def test_new_stepsize(self):16        dx_new = 2.017        xbin, ybin, yerr, step_size = utils.rebin_data(self.x, self.y, dx_new,18                                                       self.yerr)19        assert np.allclose(step_size, dx_new / self.dx)20    def test_arrays(self):21        dx_new = 2.022        xbin, ybin, yerr, step_size = utils.rebin_data(self.x, self.y, dx_new,23                                                       self.yerr)24        assert isinstance(xbin, np.ndarray)25        assert isinstance(ybin, np.ndarray)26    def test_length_matches(self):27        dx_new = 2.028        xbin, ybin, yerr, step_size = utils.rebin_data(self.x, self.y, dx_new,29                                                       self.yerr)30        assert xbin.shape[0] == ybin.shape[0]31    def test_binned_counts(self):32        dx_new = 2.033        xbin, ybin, yerr, step_size = utils.rebin_data(self.x, self.y, dx_new,34                                                       self.yerr)35        ybin_test = np.zeros_like(xbin) + self.counts * dx_new / self.dx36        assert np.allclose(ybin, ybin_test)37    def test_uneven_bins(self):38        dx_new = 1.539        xbin, ybin, yerr, step_size = utils.rebin_data(self.x, self.y, dx_new,40                                                       self.yerr)41        assert np.isclose(xbin[1] - xbin[0], dx_new)42    def test_uneven_binned_counts(self):43        dx_new = 1.544        xbin, ybin, yerr, step_size = utils.rebin_data(self.x, self.y, dx_new,45                                                       self.yerr)46        ybin_test = np.zeros_like(xbin) + self.counts * dx_new / self.dx47        assert np.allclose(ybin_test, ybin)48    def test_rebin_data_should_raise_error_when_method_is_not_allowed(self):49        dx_new = 2.050        with pytest.raises(ValueError):51            utils.rebin_data(self.x, self.y, dx_new, self.yerr,52                             method='not_allowed_method')53    def test_rebin_variable_input_sampling(self):54        x1 = np.linspace(0,10,11)55        x2 = np.linspace(10.33, 20.0, 30)56        x3 = np.linspace(21, 30, 10 )57        x = np.hstack([x1, x2, x3])58        counts = 2.059        y = np.zeros_like(x) + counts60        yerr = np.sqrt(y)61        dx_new = 1.562        xbin, ybin, yerr_bin, step_size = utils.rebin_data(x, y, dx_new, yerr)63        assert np.allclose(ybin, counts * step_size)64        assert len(xbin) == 2065    def test_rebin_variable_input_mean(self):66        x1 = np.linspace(0,10,11)67        x2 = np.linspace(10.33, 20.0, 30)68        x3 = np.linspace(21, 30, 10 )69        x = np.hstack([x1, x2, x3])70        counts = 2.071        y = np.zeros_like(x) + counts72        yerr = np.sqrt(y)73        dx_new = 1.574        xbin, ybin, yerr_bin, step_size = utils.rebin_data(x, y, dx_new, yerr, method="average")75        assert len(xbin) == 2076        assert np.allclose(ybin, counts)77class TestRebinDataLog(object):78    @classmethod79    def setup_class(cls):80        cls.dx = 181        cls.xmax = 2182        cls.xmin = 183        cls.x = np.arange(cls.xmin, cls.xmax, cls.dx)84        cls.y = np.arange(cls.xmin, cls.xmax, cls.dx)85        cls.y_err = np.ones_like(cls.y)86        cls.true_bins = np.array(87            [1., 1.1, 1.21, 1.331, 1.4641, 1.61051, 1.771561,88             1.9487171, 2.14358881, 2.35794769, 2.59374246, 2.85311671])89        cls.true_bin_edges = np.array(90            [0.5, 1.5, 2.6000000000000001, 3.81, 5.141, 6.6051, 8.21561,91             9.987171, 11.9358881, 14.07947691, 16.437424601, 19.0311670611,92             21.88428376721])93        cls.true_values = np.array(94            [1., 2., 3., 4.5, 6., 7.5, 9., 10.5, 13., 15.5, 18., 20.])95        cls.true_nsamples = np.array([1, 1, 1, 2, 1, 2, 1, 2, 3, 2, 3, 1])96        cls.f = 0.197    def test_rebin_data_log_runs(self):98        _, _, _, _ = utils.rebin_data_log(self.x, self.y, self.f,99                                          y_err=self.y_err, dx=self.dx)100    def test_method_fails_if_x_and_y_of_unequal_length(self):101        with pytest.raises(ValueError):102            _, _, _, _ = utils.rebin_data_log(self.x[1:], self.y, self.f,103                                              y_err=self.y_err, dx=self.dx)104    def test_method_fails_if_y_and_yerr_of_unequal_length(self):105        with pytest.raises(ValueError):106            _, _, _, _ = utils.rebin_data_log(self.x, self.y, self.f,107                                              y_err=self.y_err[1:], dx=self.dx)108    def test_all_outputs_have_the_same_dimension_except_binx(self):109        binx, biny, binyerr, nsamples = utils.rebin_data_log(self.x, self.y,110                                                             self.f,111                                                             y_err=self.y_err,112                                                             dx=self.dx)113        # binx describes the bin _edges_ rather than midpoints, so has one114        # more entry than biny and the rest115        assert binx.shape[0] == biny.shape[0] + 1116        assert biny.shape[0] == binyerr.shape[0]117        assert binyerr.shape[0] == nsamples.shape[0]118    def test_binning_works_correctly(self):119        binx, _, _, _ = utils.rebin_data_log(self.x, self.y, self.f,120                                             y_err=self.y_err, dx=self.dx)121        assert np.allclose(np.diff(binx), self.true_bins)122    def test_bin_edges_are_correct(self):123        binx, _, _, _ = utils.rebin_data_log(self.x, self.y, self.f,124                                             y_err=self.y_err, dx=self.dx)125        assert np.allclose(binx, self.true_bin_edges)126    def test_bin_values_are_correct(self):127        _, biny, _, _ = utils.rebin_data_log(self.x, self.y, self.f,128                                             y_err=self.y_err, dx=self.dx)129        assert np.allclose(biny, self.true_values)130    def test_nsamples_are_correctly_calculated(self):131        _, _, _, nsamples = utils.rebin_data_log(self.x, self.y, self.f,132                                                 y_err=self.y_err, dx=self.dx)133        assert np.allclose(nsamples, self.true_nsamples)134    def test_method_works_on_complex_numbers(self):135        re = np.arange(self.xmin, self.xmax, self.dx)136        im = np.arange(self.xmin, self.xmax, self.dx)137        y = np.zeros(re.shape[0], dtype=np.complex64)138        yerr = np.zeros(re.shape[0], dtype=complex)139        for k, (r, i) in enumerate(zip(re, im)):140            y[k] = r + i * 1j141            yerr[k] = r + i * 1j142        real_binned = np.zeros(self.true_values.shape[0], dtype=complex)143        for i in range(self.true_values.shape[0]):144            real_binned[i] = self.true_values[i] + self.true_values[i] * 1j145        _, _, binyerr_real, _ = \146            utils.rebin_data_log(self.x, y.real, self.f, y_err=yerr.real,147                                 dx=self.dx)148        _, biny, binyerr, _ = \149            utils.rebin_data_log(self.x, y, self.f, y_err=yerr,150                                 dx=self.dx)151        assert np.iscomplexobj(biny)152        assert np.iscomplexobj(binyerr)153        assert np.allclose(biny, real_binned)154        assert np.allclose(binyerr, binyerr_real + 1.j * binyerr_real)155    def test_return_float_with_floats(self):156        _, biny, binyerr, _ = utils.rebin_data_log(157            self.x, self.y, self.f, y_err=self.y_err, dx=self.dx)158        assert not np.iscomplexobj(biny)159        assert not np.iscomplexobj(binyerr)160class TestUtils(object):161    def test_optimal_bin_time(self):162        assert utils.optimal_bin_time(512, 2.1) == 2163        assert utils.optimal_bin_time(512, 3.9) == 2164        assert utils.optimal_bin_time(512, 4.1) == 4165    def test_order_list_of_arrays(self):166        alist = [np.array([1, 0]), np.array([2, 3])]167        order = np.argsort(alist[0])168        assert np.allclose(np.array([np.array([0, 1]), np.array([3, 2])]), np.array(utils.order_list_of_arrays(alist, order)))169        alist = {"a": np.array([1, 0]), "b": np.array([2, 3])}170        alist_new = utils.order_list_of_arrays(alist, order)171        assert np.allclose(np.array([0, 1]), alist_new["a"])172        assert np.allclose(np.array([3, 2]), alist_new["b"])173        alist = 0174        assert utils.order_list_of_arrays(alist, order) is None175    def test_look_for_array(self):176        assert utils.look_for_array_in_array(np.arange(2), np.arange(1, 3))177        assert not utils.look_for_array_in_array(np.arange(2),178                                                 np.arange(2, 4))179    def test_assign_value_if_none(self):180        assert utils.assign_value_if_none(None, 2) == 2181        assert utils.assign_value_if_none(1, 2) == 1182    def test_apply_function_if_none(self):183        assert utils.apply_function_if_none(None, [1, 2, 3], np.median) == 2184        assert utils.apply_function_if_none(1, [1, 2, 3], np.median) == 1185    def test_contiguous(self):186        """A more complicated example of intersection of GTIs."""187        array = np.array([0, 1, 1, 0, 1, 1, 1], dtype=bool)188        cont = utils.contiguous_regions(array)189        assert np.allclose(cont, np.array([[1, 3], [4, 7]])), \190            'Contiguous region wrong'191    def test_get_random_state(self):192        # Life, Universe and Everything193        lue = 42194        random_state = np.random.RandomState(lue)195        assert utils.get_random_state(None) is np.random.mtrand._rand196        assert np.all(197            utils.get_random_state(lue).randn(lue) == np.random.RandomState(198                lue).randn(lue))199        assert np.all(utils.get_random_state(np.random.RandomState(lue)).randn(200            lue) == np.random.RandomState(lue).randn(lue))201        with pytest.raises(ValueError):202            utils.get_random_state('foobar')203class TestCreateWindow(object):204    @classmethod205    def setup_class(cls):206        cls.N = 5207        cls.uniform_window = 'uniform'208        cls.parzen_window = 'parzen'209        cls.hamming_window = 'hamming'210        cls.hanning_window = 'hanning'211        cls.triangular_window = 'triangular'212        cls.welch_window = 'welch'213        cls.blackmann_window = 'blackmann'214        cls.flattop_window = 'flat-top'215    def test_bad_N(self):216        N_bad = 'abc'217        with pytest.raises(TypeError):218            window = utils.create_window(N_bad, self.uniform_window)219    def test_bad_window_type(self):220        window_bad = 123221        with pytest.raises(TypeError):222            window = utils.create_window(self.N, window_bad)223    def test_not_available_window(self):224        window_not = 'kaiser'225        with pytest.raises(ValueError):226            window = utils.create_window(self.N, window_not)227    def test_N_equals_zero(self):228        N = 0229        window = utils.create_window(N)230        assert len(window) == 0231    def test_uniform_window(self):232        result = np.ones(self.N)233        window = utils.create_window(self.N)234        assert np.allclose(window, result)235    def test_parzen_window(self):236        result = np.array([0, 0.25, 1, 0.25, 0])237        window = utils.create_window(self.N, self.parzen_window)238        assert np.allclose(window, result)239    def test_hamming_window(self):240        result = np.array([0.08, 0.54, 1, 0.54, 0.08])241        window = utils.create_window(self.N, self.hamming_window)242        assert np.allclose(window, result)243    def test_hanning_window(self):244        result = np.array([0, 0.5, 1, 0.5, 0])245        window = utils.create_window(self.N, self.hanning_window)246        assert np.allclose(window, result)247    def test_triangular_window(self):248        result = np.array([0.6, 0.8, 1, 0.8, 0.6])249        window = utils.create_window(self.N, self.triangular_window)250        assert np.allclose(window, result)251    def test_welch_window(self):252        result = np.array([0, 0.75, 1, 0.75, 0])253        window = utils.create_window(self.N, self.welch_window)254        assert np.allclose(window, result)255    def test_blackmann_window(self):256        result = np.array([0.006879, 0.349741, 0.999999, 0.349741, 0.006879])257        window = utils.create_window(self.N, self.blackmann_window)258        assert np.allclose(window, result)259    def test_flat_top_window(self):260        result = np.array(261            [8.67361738e-17, -2.62000000e-01, 4.63600000e+00, -2.62000000e-01,262             8.67361738e-17])263        window = utils.create_window(self.N, self.flattop_window)264        assert np.allclose(window, result)265def test_standard_error():266    x = np.arange(1, 100)267    x = np.array(np.split(x, 3))268    error = utils.standard_error(x, x.mean(axis=0))269    assert np.allclose(error, sem(x))270def test_nearest_power_of_two():271    assert utils.nearest_power_of_two(4) == 4272    assert utils.nearest_power_of_two(5) == 4273    assert utils.nearest_power_of_two(6) == 8274    assert utils.nearest_power_of_two(7) == 8275def test_find_nearest():276    x = np.arange(1, 10)277    assert utils.find_nearest(x, 2) == (2, 1)278    assert utils.find_nearest(x, 4.5) == (5, 4)279    assert utils.find_nearest(x, 7.4) == (7, 6)280def test_equal_count_energy_ranges():281    energies = np.random.uniform(0.3, 12, 1000000)282    edges = utils.equal_count_energy_ranges(energies, 100, emin=0.3, emax=12)283    for e0, e1 in zip(edges[:-1], edges[1:]):284        good = energies[(energies >= e0) & (energies < e1)]285        assert np.count_nonzero(good) == 10000286def test_histogram_equiv_numpy():287    x = np.random.uniform(0., 1., 100)288    H, _, = np.histogram(x, bins=5, range=(0., 1.))289    Hn = utils.histogram(x, bins=5, range=np.array([0., 1.]))290    assert np.all(H == Hn)291def test_histogram2d_equiv_numpy():292    x = np.random.uniform(0., 1., 100)293    y = np.random.uniform(2., 3., 100)294    H, _, _ = np.histogram2d(x, y, bins=np.array((5, 5)),295                             range=[(0., 1.), (2., 3.)])296    Hn = utils.histogram2d(x, y, bins=np.array([5, 5]),297                           range=np.array([[0., 1.], [2., 3.]]))298    assert np.all(H == Hn)299def test_compute_bin():300    bin_edges = np.array([0, 5, 10])301    assert utils.compute_bin(1, bin_edges) == 0302    assert utils.compute_bin(5, bin_edges) == 1...test_check_package.py
Source:test_check_package.py  
1"""Test cases for utils/check-package.2It does not inherit from infra.basetest.BRTest and therefore does not generate3a logfile. Only when the tests fail there will be output to the console.4The make target ('make check-package') is already used by the job5'check-package' and won't be tested here.6"""7import os8import subprocess9import unittest10import infra11def call_script(args, env, cwd):12    """Call a script and return stdout and stderr as lists."""13    out, err = subprocess.Popen(args, cwd=cwd, stdout=subprocess.PIPE,14                                stderr=subprocess.PIPE, env=env,15                                universal_newlines=True).communicate()16    return out.splitlines(), err.splitlines()17class TestCheckPackage(unittest.TestCase):18    """Test the various ways the script can be called.19    The script can be called either using relative path, absolute path or from20    PATH.21    The files to be checked can be passed as arguments using either relative22    path or absolute path.23    When in in-tree mode (without -b) some in-tree files and also all24    out-of-tree files are ignored.25    When in out-tree mode (with -b) the script does generate warnings for these26    but ignores external.mk.27    """28    WITH_EMPTY_PATH = {}29    WITH_UTILS_IN_PATH = {"PATH": infra.basepath("utils") + ":" + os.environ["PATH"]}30    relative = [31        # base_script           base_file               rel_script               rel_file                rel_cwd32        ["utils/check-package", "package/atop/atop.mk", "./utils/check-package", "package/atop/atop.mk", ""],33        ["utils/check-package", "package/atop/atop.mk", "./utils/check-package", "./package/atop/atop.mk", ""],34        ["utils/check-package", "package/atop/atop.mk", "../../utils/check-package", "atop.mk", "package/atop"],35        ["utils/check-package", "package/atop/atop.mk", "../../utils/check-package", "./atop.mk", "package/atop"],36        ["utils/check-package", "package/atop/atop.mk", "../utils/check-package", "atop/atop.mk", "package"],37        ["utils/check-package", "package/atop/atop.mk", "../utils/check-package", "./atop/atop.mk", "package"],38        ["utils/check-package", "package/atop/Config.in", "./utils/check-package", "package/atop/Config.in", ""],39        ["utils/check-package", "package/atop/Config.in", "./utils/check-package", "./package/atop/Config.in", ""],40        ["utils/check-package", "package/atop/Config.in", "../../utils/check-package", "Config.in", "package/atop"],41        ["utils/check-package", "package/atop/Config.in", "../../utils/check-package", "./Config.in", "package/atop"],42        ["utils/check-package", "package/atop/Config.in", "../utils/check-package", "atop/Config.in", "package"],43        ["utils/check-package", "package/atop/Config.in", "../utils/check-package", "./atop/Config.in", "package"]]44    def assert_file_was_processed(self, stderr):45        """Infer from check-package stderr if at least one file was processed46        and fail otherwise."""47        self.assertIn("lines processed", stderr[0], stderr)48        processed = int(stderr[0].split()[0])49        self.assertGreater(processed, 0)50    def assert_file_was_ignored(self, stderr):51        """Infer from check-package stderr if no file was processed and fail52        otherwise."""53        self.assertIn("lines processed", stderr[0], stderr)54        processed = int(stderr[0].split()[0])55        self.assertEqual(processed, 0)56    def assert_warnings_generated_for_file(self, stderr):57        """Infer from check-package stderr if at least one warning was generated58        and fail otherwise."""59        self.assertIn("warnings generated", stderr[1], stderr)60        generated = int(stderr[1].split()[0])61        self.assertGreater(generated, 0)62    def test_run(self):63        """Test the various ways the script can be called in a simple top to64        bottom sequence."""65        # an intree file can be checked by the script called from relative path,66        # absolute path and from PATH67        for base_script, base_file, rel_script, rel_file, rel_cwd in self.relative:68            abs_script = infra.basepath(base_script)69            abs_file = infra.basepath(base_file)70            cwd = infra.basepath(rel_cwd)71            _, m = call_script([rel_script, rel_file],72                               self.WITH_EMPTY_PATH, cwd)73            self.assert_file_was_processed(m)74            _, m = call_script([abs_script, rel_file],75                               self.WITH_EMPTY_PATH, cwd)76            self.assert_file_was_processed(m)77            _, m = call_script(["check-package", rel_file],78                               self.WITH_UTILS_IN_PATH, cwd)79            self.assert_file_was_processed(m)80            _, m = call_script([rel_script, abs_file],81                               self.WITH_EMPTY_PATH, cwd)82            self.assert_file_was_processed(m)83            _, m = call_script([abs_script, abs_file],84                               self.WITH_EMPTY_PATH, cwd)85            self.assert_file_was_processed(m)86            _, m = call_script(["check-package", abs_file],87                               self.WITH_UTILS_IN_PATH, cwd)88            self.assert_file_was_processed(m)89        # some intree files are ignored90        _, m = call_script(["./utils/check-package", "package/pkg-generic.mk"],91                           self.WITH_EMPTY_PATH, infra.basepath())92        self.assert_file_was_ignored(m)93        _, m = call_script(["./utils/check-package", "-b", "package/pkg-generic.mk"],94                           self.WITH_EMPTY_PATH, infra.basepath())95        self.assert_file_was_processed(m)96        # an out-of-tree file can be checked by the script called from relative97        # path, absolute path and from PATH98        for base_script, base_file, rel_script, rel_file, rel_cwd in self.relative:99            abs_script = infra.basepath(base_script)100            abs_file = infra.basepath(base_file)101            cwd = infra.basepath(rel_cwd)102            _, m = call_script([rel_script, "-b", rel_file],103                               self.WITH_EMPTY_PATH, cwd)104            self.assert_file_was_processed(m)105            _, m = call_script([abs_script, "-b", rel_file],106                               self.WITH_EMPTY_PATH, cwd)107            self.assert_file_was_processed(m)108            _, m = call_script(["check-package", "-b", rel_file],109                               self.WITH_UTILS_IN_PATH, cwd)110            self.assert_file_was_processed(m)111            _, m = call_script([rel_script, "-b", abs_file],112                               self.WITH_EMPTY_PATH, cwd)113            self.assert_file_was_processed(m)114            _, m = call_script([abs_script, "-b", abs_file],115                               self.WITH_EMPTY_PATH, cwd)116            self.assert_file_was_processed(m)117            _, m = call_script(["check-package", "-b", abs_file],118                               self.WITH_UTILS_IN_PATH, cwd)119            self.assert_file_was_processed(m)120        # out-of-tree files are are ignored without -b but can generate warnings121        # with -b122        abs_path = infra.filepath("tests/utils/br2-external")123        rel_file = "Config.in"124        abs_file = os.path.join(abs_path, rel_file)125        _, m = call_script(["check-package", rel_file],126                           self.WITH_UTILS_IN_PATH, abs_path)127        self.assert_file_was_ignored(m)128        _, m = call_script(["check-package", abs_file],129                           self.WITH_UTILS_IN_PATH, infra.basepath())130        self.assert_file_was_ignored(m)131        w, m = call_script(["check-package", "-b", rel_file],132                           self.WITH_UTILS_IN_PATH, abs_path)133        self.assert_file_was_processed(m)134        self.assert_warnings_generated_for_file(m)135        self.assertIn("{}:1: empty line at end of file".format(rel_file), w)136        w, m = call_script(["check-package", "-b", abs_file],137                           self.WITH_UTILS_IN_PATH, infra.basepath())138        self.assert_file_was_processed(m)139        self.assert_warnings_generated_for_file(m)140        self.assertIn("{}:1: empty line at end of file".format(abs_file), w)141        # external.mk is ignored only when in the root path of a br2-external142        rel_file = "external.mk"143        abs_file = os.path.join(abs_path, rel_file)144        _, m = call_script(["check-package", "-b", rel_file],145                           self.WITH_UTILS_IN_PATH, abs_path)146        self.assert_file_was_ignored(m)147        _, m = call_script(["check-package", "-b", abs_file],148                           self.WITH_UTILS_IN_PATH, infra.basepath())149        self.assert_file_was_ignored(m)150        abs_path = infra.filepath("tests/utils/br2-external/package/external")151        abs_file = os.path.join(abs_path, rel_file)152        w, m = call_script(["check-package", "-b", rel_file],153                           self.WITH_UTILS_IN_PATH, abs_path)154        self.assert_file_was_processed(m)155        self.assert_warnings_generated_for_file(m)156        self.assertIn("{}:1: should be 80 hashes (http://nightly.buildroot.org/#writing-rules-mk)".format(rel_file), w)157        w, m = call_script(["check-package", "-b", abs_file],158                           self.WITH_UTILS_IN_PATH, infra.basepath())159        self.assert_file_was_processed(m)160        self.assert_warnings_generated_for_file(m)...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
