How to use utils method in toolium

Best Python code snippet using toolium_python

shell.py

Source:shell.py Github

copy

Full Screen

1# Copyright 2010 Jacob Kaplan-Moss2# Copyright 2011 X7 LLC.3# All Rights Reserved.4#5# Licensed under the Apache License, Version 2.0 (the "License"); you may6# not use this file except in compliance with the License. You may obtain7# a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the14# License for the specific language governing permissions and limitations15# under the License.16import getpass17import os18from engineclient import exceptions19from engineclient import utils20from engineclient.v1_1 import servers21AUTO_KEY = object()22def _boot(cs, args, reservation_id=None, min_count=None, max_count=None):23 """Boot a new server."""24 if min_count is None:25 min_count = 126 if max_count is None:27 max_count = min_count28 if min_count > max_count:29 raise exceptions.CommandError("min_instances should be <= "30 "max_instances")31 if not min_count or not max_count:32 raise exceptions.CommandError("min_instances nor max_instances should"33 "be 0")34 if not args.image and not args.block_device_mapping:35 raise exceptions.CommandError("you need to specify an Image ID "36 "or a block device mapping ")37 if not args.flavor:38 raise exceptions.CommandError("you need to specify a Flavor ID ")39 flavor = args.flavor40 image = args.image41 meta = dict(v.split('=') for v in args.meta)42 files = {}43 for f in args.files:44 dst, src = f.split('=', 1)45 try:46 files[dst] = open(src)47 except IOError, e:48 raise exceptions.CommandError("Can't open '%s': %s" % (src, e))49 # use the os-keypair extension50 key_name = None51 if args.key_name is not None:52 key_name = args.key_name53 # or use file injection functionality (independent of os-keypair extension)54 keyfile = None55 if args.key_path is AUTO_KEY:56 possible_keys = [os.path.join(os.path.expanduser('~'), '.ssh', k)57 for k in ('id_dsa.pub', 'id_rsa.pub')]58 for k in possible_keys:59 if os.path.exists(k):60 keyfile = k61 break62 else:63 raise exceptions.CommandError("Couldn't find a key file: tried "64 "~/.ssh/id_dsa.pub or ~/.ssh/id_rsa.pub")65 elif args.key_path:66 keyfile = args.key_path67 if keyfile:68 try:69 files['/root/.ssh/authorized_keys2'] = open(keyfile)70 except IOError, e:71 raise exceptions.CommandError("Can't open '%s': %s" % (keyfile, e))72 if args.user_data:73 try:74 userdata = open(args.user_data)75 except IOError, e:76 raise exceptions.CommandError("Can't open '%s': %s" % \77 (args.user_data, e))78 else:79 userdata = None80 if args.availability_zone:81 availability_zone = args.availability_zone82 else:83 availability_zone = None84 if args.security_groups:85 security_groups = args.security_groups.split(',')86 else:87 security_groups = None88 block_device_mapping = {}89 for bdm in args.block_device_mapping:90 device_name, mapping = bdm.split('=', 1)91 block_device_mapping[device_name] = mapping92 nics = []93 for nic_str in args.nics:94 nic_info = {"net-id": "", "v4-fixed-ip": ""}95 for kv_str in nic_str.split(","):96 k, v = kv_str.split("=")97 nic_info[k] = v98 nics.append(nic_info)99 boot_args = [args.name, image, flavor]100 boot_kwargs = dict(101 meta=meta,102 files=files,103 key_name=key_name,104 reservation_id=reservation_id,105 min_count=min_count,106 max_count=max_count,107 userdata=userdata,108 availability_zone=availability_zone,109 security_groups=security_groups,110 block_device_mapping=block_device_mapping,111 nics=nics)112 return boot_args, boot_kwargs113@utils.arg('--flavor',114 default=None,115 metavar='<flavor>',116 help="Flavor ID (see 'engine flavor-list').")117@utils.arg('--image',118 default=None,119 metavar='<image>',120 help="Image ID (see 'engine image-list'). ")121@utils.arg('--meta',122 metavar="<key=value>",123 action='append',124 default=[],125 help="Record arbitrary key/value metadata. "\126 "May be give multiple times.")127@utils.arg('--file',128 metavar="<dst-path=src-path>",129 action='append',130 dest='files',131 default=[],132 help="Store arbitrary files from <src-path> locally to <dst-path> "\133 "on the new server. You may store up to 5 files.")134@utils.arg('--key_path',135 metavar='<key_path>',136 nargs='?',137 const=AUTO_KEY,138 help="Key the server with an SSH keypair. "\139 "Looks in ~/.ssh for a key, "\140 "or takes an explicit <path> to one. (uses --file functionality)")141@utils.arg('--key_name',142 metavar='<key_name>',143 help="Key name of keypair that should be created earlier with \144 the command keypair-add")145@utils.arg('name', metavar='<name>', help='Name for the new server')146@utils.arg('--user_data',147 default=None,148 metavar='<user-data>',149 help="user data file to pass to be exposed by the metadata server.")150@utils.arg('--availability_zone',151 default=None,152 metavar='<availability-zone>',153 help="zone id.")154@utils.arg('--security_groups',155 default=None,156 metavar='<security_groups>',157 help="comma separated list of security group names.")158@utils.arg('--block_device_mapping',159 metavar="<dev_name=mapping>",160 action='append',161 default=[],162 help="Block device mapping in the format "163 "<dev_name=<id>:<type>:<size(GB)>:<delete_on_terminate>.")164@utils.arg('--nic',165 metavar="<net-id=net-uuid,v4-fixed-ip=ip-addr>",166 action='append',167 dest='nics',168 default=[],169 help="Create a NIC on the server.\n"170 "Specify option multiple times to create multiple NICs.\n"171 "net-id: attach NIC to network with this UUID (optional)\n"172 "v4-fixed-ip: IPv4 fixed address for NIC (optional).")173def do_boot(cs, args):174 """Boot a new server."""175 boot_args, boot_kwargs = _boot(cs, args)176 extra_boot_kwargs = utils.get_resource_manager_extra_kwargs(do_boot, args)177 boot_kwargs.update(extra_boot_kwargs)178 server = cs.servers.create(*boot_args, **boot_kwargs)179 # Keep any information (like adminPass) returned by create180 info = server._info181 server = cs.servers.get(info['id'])182 info.update(server._info)183 flavor = info.get('flavor', {})184 flavor_id = flavor.get('id', '')185 info['flavor'] = _find_flavor(cs, flavor_id).name186 image = info.get('image', {})187 image_id = image.get('id', '')188 info['image'] = _find_image(cs, image_id).name189 info.pop('links', None)190 info.pop('addresses', None)191 utils.print_dict(info)192@utils.arg('--flavor',193 default=None,194 metavar='<flavor>',195 help="Flavor ID (see 'engine flavor-list')")196@utils.arg('--image',197 default=None,198 metavar='<image>',199 help="Image ID (see 'engine image-list').")200@utils.arg('--meta',201 metavar="<key=value>",202 action='append',203 default=[],204 help="Record arbitrary key/value metadata. "\205 "May be give multiple times.")206@utils.arg('--file',207 metavar="<dst-path=src-path>",208 action='append',209 dest='files',210 default=[],211 help="Store arbitrary files from <src-path> locally to <dst-path> "\212 "on the new server. You may store up to 5 files.")213@utils.arg('--key',214 metavar='<path>',215 nargs='?',216 const=AUTO_KEY,217 help="Key the server with an SSH keypair. "\218 "Looks in ~/.ssh for a key, "\219 "or takes an explicit <path> to one.")220@utils.arg('--reservation_id',221 default=None,222 metavar='<reservation_id>',223 help="Reservation ID (a UUID). "\224 "If unspecified will be generated by the server.")225@utils.arg('--min_instances',226 default=None,227 type=int,228 metavar='<number>',229 help="The minimum number of instances to build. "\230 "Defaults to 1.")231@utils.arg('--max_instances',232 default=None,233 type=int,234 metavar='<number>',235 help="The maximum number of instances to build. "\236 "Defaults to 'min_instances' setting.")237@utils.arg('name', metavar='<name>', help='Name for the new server')238def do_zone_boot(cs, args):239 """Boot a new server, potentially across Zones."""240 boot_args, boot_kwargs = _boot(cs,241 args,242 reservation_id=args.reservation_id,243 min_count=args.min_instances,244 max_count=args.max_instances)245 extra_boot_kwargs = utils.get_resource_manager_extra_kwargs(246 do_zone_boot, args)247 boot_kwargs.update(extra_boot_kwargs)248 reservation_id = cs.zones.boot(*boot_args, **boot_kwargs)249 print "Reservation ID=", reservation_id250def _translate_flavor_keys(collection):251 convert = [('ram', 'memory_mb'), ('disk', 'local_gb')]252 for item in collection:253 keys = item.__dict__.keys()254 for from_key, to_key in convert:255 if from_key in keys and to_key not in keys:256 setattr(item, to_key, item._info[from_key])257def do_flavor_list(cs, args):258 """Print a list of available 'flavors' (sizes of servers)."""259 flavors = cs.flavors.list()260 _translate_flavor_keys(flavors)261 utils.print_list(flavors, [262 'ID',263 'Name',264 'Memory_MB',265 'Swap',266 'Local_GB',267 'VCPUs',268 'RXTX_Factor'])269def do_image_list(cs, args):270 """Print a list of available images to boot from."""271 image_list = cs.images.list()272 def parse_server_name(image):273 try:274 return image.server['id']275 except (AttributeError, KeyError):276 return ''277 fmts = {'Server': parse_server_name}278 utils.print_list(image_list, ['ID', 'Name', 'Status', 'Server'], fmts)279@utils.arg('image',280 metavar='<image>',281 help="Name or ID of image")282@utils.arg('action',283 metavar='<action>',284 choices=['set', 'delete'],285 help="Actions: 'set' or 'delete'")286@utils.arg('metadata',287 metavar='<key=value>',288 nargs='+',289 action='append',290 default=[],291 help='Metadata to add/update or delete (only key is necessary on delete)')292def do_image_meta(cs, args):293 """Set or Delete metadata on an image."""294 image = _find_image(cs, args.image)295 metadata = {}296 for metadatum in args.metadata[0]:297 # Can only pass the key in on 'delete'298 # So this doesn't have to have '='299 if metadatum.find('=') > -1:300 (key, value) = metadatum.split('=', 1)301 else:302 key = metadatum303 value = None304 metadata[key] = value305 if args.action == 'set':306 cs.images.set_meta(image, metadata)307 elif args.action == 'delete':308 cs.images.delete_meta(image, metadata.keys())309def _print_image(image):310 info = image._info.copy()311 # ignore links, we don't need to present those312 info.pop('links')313 # try to replace a server entity to just an id314 server = info.pop('server', None)315 try:316 info['server'] = server['id']317 except (KeyError, TypeError):318 pass319 # break up metadata and display each on its own row320 metadata = info.pop('metadata', {})321 try:322 for key, value in metadata.items():323 _key = 'metadata %s' % key324 info[_key] = value325 except AttributeError:326 pass327 utils.print_dict(info)328@utils.arg('image',329 metavar='<image>',330 help="Name or ID of image")331def do_image_show(cs, args):332 """Show details about the given image."""333 image = _find_image(cs, args.image)334 _print_image(image)335@utils.arg('image', metavar='<image>', help='Name or ID of image.')336def do_image_delete(cs, args):337 """338 Delete an image.339 It should go without saying, but you can only delete images you340 created.341 """342 image = _find_image(cs, args.image)343 image.delete()344@utils.arg('--reservation_id',345 dest='reservation_id',346 metavar='<reservation_id>',347 default=None,348 help='Only return instances that match reservation_id.')349@utils.arg('--recurse_zones',350 dest='recurse_zones',351 metavar='<0|1>',352 nargs='?',353 type=int,354 const=1,355 default=0,356 help='Recurse through all zones if set.')357@utils.arg('--ip',358 dest='ip',359 metavar='<ip_regexp>',360 default=None,361 help='Search with regular expression match by IP address')362@utils.arg('--ip6',363 dest='ip6',364 metavar='<ip6_regexp>',365 default=None,366 help='Search with regular expression match by IPv6 address')367@utils.arg('--name',368 dest='name',369 metavar='<name_regexp>',370 default=None,371 help='Search with regular expression match by name')372@utils.arg('--instance_name',373 dest='instance_name',374 metavar='<name_regexp>',375 default=None,376 help='Search with regular expression match by instance name')377@utils.arg('--status',378 dest='status',379 metavar='<status>',380 default=None,381 help='Search by server status')382@utils.arg('--flavor',383 dest='flavor',384 metavar='<flavor>',385 type=int,386 default=None,387 help='Search by flavor ID')388@utils.arg('--image',389 dest='image',390 metavar='<image>',391 default=None,392 help='Search by image ID')393@utils.arg('--host',394 dest='host',395 metavar='<hostname>',396 default=None,397 help='Search instances by hostname to which they are assigned')398def do_list(cs, args):399 """List active servers."""400 recurse_zones = args.recurse_zones401 search_opts = {402 'reservation_id': args.reservation_id,403 'recurse_zones': recurse_zones,404 'ip': args.ip,405 'ip6': args.ip6,406 'name': args.name,407 'image': args.image,408 'flavor': args.flavor,409 'status': args.status,410 'host': args.host,411 'instance_name': args.instance_name}412 if recurse_zones:413 id_col = 'UUID'414 else:415 id_col = 'ID'416 columns = [id_col, 'Name', 'Status', 'Networks']417 formatters = {'Networks': utils._format_servers_list_networks}418 utils.print_list(cs.servers.list(search_opts=search_opts), columns,419 formatters)420@utils.arg('--hard',421 dest='reboot_type',422 action='store_const',423 const=servers.REBOOT_HARD,424 default=servers.REBOOT_SOFT,425 help='Perform a hard reboot (instead of a soft one).')426@utils.arg('server', metavar='<server>', help='Name or ID of server.')427def do_reboot(cs, args):428 """Reboot a server."""429 _find_server(cs, args.server).reboot(args.reboot_type)430@utils.arg('server', metavar='<server>', help='Name or ID of server.')431@utils.arg('image', metavar='<image>', help="Name or ID of new image.")432@utils.arg('--rebuild_password', dest='rebuild_password',433 metavar='<rebuild_password>', default=False,434 help="Set the provided password on the rebuild instance.")435def do_rebuild(cs, args):436 """Shutdown, re-image, and re-boot a server."""437 server = _find_server(cs, args.server)438 image = _find_image(cs, args.image)439 if args.rebuild_password is not False:440 _password = args.rebuild_password441 else:442 _password = None443 s = server.rebuild(image, _password)444 _print_server(cs, s)445@utils.arg('server', metavar='<server>',446 help='Name (old name) or ID of server.')447@utils.arg('name', metavar='<name>', help='New name for the server.')448def do_rename(cs, args):449 """Rename a server."""450 _find_server(cs, args.server).update(name=args.name)451@utils.arg('server', metavar='<server>', help='Name or ID of server.')452@utils.arg('flavor', metavar='<flavor>', help="Name or ID of new flavor.")453def do_resize(cs, args):454 """Resize a server."""455 server = _find_server(cs, args.server)456 flavor = _find_flavor(cs, args.flavor)457 server.resize(flavor)458@utils.arg('server', metavar='<server>', help='Name or ID of server.')459def do_resize_confirm(cs, args):460 """Confirm a previous resize."""461 _find_server(cs, args.server).confirm_resize()462@utils.arg('server', metavar='<server>', help='Name or ID of server.')463def do_resize_revert(cs, args):464 """Revert a previous resize (and return to the previous VM)."""465 _find_server(cs, args.server).revert_resize()466@utils.arg('server', metavar='<server>', help='Name or ID of server.')467def do_migrate(cs, args):468 """Migrate a server."""469 _find_server(cs, args.server).migrate()470@utils.arg('server', metavar='<server>', help='Name or ID of server.')471def do_pause(cs, args):472 """Pause a server."""473 _find_server(cs, args.server).pause()474@utils.arg('server', metavar='<server>', help='Name or ID of server.')475def do_unpause(cs, args):476 """Unpause a server."""477 _find_server(cs, args.server).unpause()478@utils.arg('server', metavar='<server>', help='Name or ID of server.')479def do_suspend(cs, args):480 """Suspend a server."""481 _find_server(cs, args.server).suspend()482@utils.arg('server', metavar='<server>', help='Name or ID of server.')483def do_resume(cs, args):484 """Resume a server."""485 _find_server(cs, args.server).resume()486@utils.arg('server', metavar='<server>', help='Name or ID of server.')487def do_rescue(cs, args):488 """Rescue a server."""489 _find_server(cs, args.server).rescue()490@utils.arg('server', metavar='<server>', help='Name or ID of server.')491def do_unrescue(cs, args):492 """Unrescue a server."""493 _find_server(cs, args.server).unrescue()494@utils.arg('server', metavar='<server>', help='Name or ID of server.')495def do_diagnostics(cs, args):496 """Retrieve server diagnostics."""497 utils.print_dict(cs.servers.diagnostics(args.server)[1])498@utils.arg('server', metavar='<server>', help='Name or ID of server.')499def do_actions(cs, args):500 """Retrieve server actions."""501 utils.print_list(502 cs.servers.actions(args.server),503 ["Created_At", "Action", "Error"])504@utils.arg('server', metavar='<server>', help='Name or ID of server.')505def do_root_password(cs, args):506 """507 Change the root password for a server.508 """509 server = _find_server(cs, args.server)510 p1 = getpass.getpass('New password: ')511 p2 = getpass.getpass('Again: ')512 if p1 != p2:513 raise exceptions.CommandError("Passwords do not match.")514 server.change_password(p1)515@utils.arg('server', metavar='<server>', help='Name or ID of server.')516@utils.arg('name', metavar='<name>', help='Name of snapshot.')517def do_image_create(cs, args):518 """Create a new image by taking a snapshot of a running server."""519 server = _find_server(cs, args.server)520 cs.servers.create_image(server, args.name)521@utils.arg('server',522 metavar='<server>',523 help="Name or ID of server")524@utils.arg('action',525 metavar='<action>',526 choices=['set', 'delete'],527 help="Actions: 'set' or 'delete'")528@utils.arg('metadata',529 metavar='<key=value>',530 nargs='+',531 action='append',532 default=[],533 help='Metadata to set or delete (only key is necessary on delete)')534def do_meta(cs, args):535 """Set or Delete metadata on a server."""536 server = _find_server(cs, args.server)537 metadata = {}538 for metadatum in args.metadata[0]:539 # Can only pass the key in on 'delete'540 # So this doesn't have to have '='541 if metadatum.find('=') > -1:542 (key, value) = metadatum.split('=', 1)543 else:544 key = metadatum545 value = None546 metadata[key] = value547 if args.action == 'set':548 cs.servers.set_meta(server, metadata)549 elif args.action == 'delete':550 cs.servers.delete_meta(server, metadata.keys())551def _print_server(cs, server):552 # By default when searching via name we will do a553 # findall(name=blah) and due a REST /details which is not the same554 # as a .get() and doesn't get the information about flavors and555 # images. This fix it as we redo the call with the id which does a556 # .get() to get all informations.557 if not 'flavor' in server._info:558 server = _find_server(cs, server.id)559 networks = server.networks560 info = server._info.copy()561 for network_label, address_list in networks.items():562 info['%s network' % network_label] = ', '.join(address_list)563 flavor = info.get('flavor', {})564 flavor_id = flavor.get('id', '')565 info['flavor'] = _find_flavor(cs, flavor_id).name566 image = info.get('image', {})567 image_id = image.get('id', '')568 info['image'] = _find_image(cs, image_id).name569 info.pop('links', None)570 info.pop('addresses', None)571 utils.print_dict(info)572@utils.arg('server', metavar='<server>', help='Name or ID of server.')573def do_show(cs, args):574 """Show details about the given server."""575 s = _find_server(cs, args.server)576 _print_server(cs, s)577@utils.arg('server', metavar='<server>', help='Name or ID of server.')578def do_delete(cs, args):579 """Immediately shut down and delete a server."""580 _find_server(cs, args.server).delete()581def _find_server(cs, server):582 """Get a server by name or ID."""583 return utils.find_resource(cs.servers, server)584def _find_image(cs, image):585 """Get an image by name or ID."""586 return utils.find_resource(cs.images, image)587def _find_flavor(cs, flavor):588 """Get a flavor by name, ID, or RAM size."""589 try:590 return utils.find_resource(cs.flavors, flavor)591 except exceptions.NotFound:592 return cs.flavors.find(ram=flavor)593# --zone_username is required since --username is already used.594@utils.arg('zone', metavar='<zone_id>', help='ID of the zone', default=None)595@utils.arg('--api_url', dest='api_url', default=None, help='New URL.')596@utils.arg('--zone_username', dest='zone_username', default=None,597 help='New zone username.')598@utils.arg('--zone_password', dest='zone_password', default=None,599 help='New password.')600@utils.arg('--weight_offset', dest='weight_offset', default=None,601 help='Child Zone weight offset.')602@utils.arg('--weight_scale', dest='weight_scale', default=None,603 help='Child Zone weight scale.')604def do_zone(cs, args):605 """Show or edit a child zone. No zone arg for this zone."""606 zone = cs.zones.get(args.zone)607 # If we have some flags, update the zone608 zone_delta = {}609 if args.api_url:610 zone_delta['api_url'] = args.api_url611 if args.zone_username:612 zone_delta['username'] = args.zone_username613 if args.zone_password:614 zone_delta['password'] = args.zone_password615 if args.weight_offset:616 zone_delta['weight_offset'] = args.weight_offset617 if args.weight_scale:618 zone_delta['weight_scale'] = args.weight_scale619 if zone_delta:620 zone.update(**zone_delta)621 else:622 utils.print_dict(zone._info)623def do_zone_info(cs, args):624 """Get this zones name and capabilities."""625 zone = cs.zones.info()626 utils.print_dict(zone._info)627@utils.arg('zone_name', metavar='<zone_name>',628 help='Name of the child zone being added.')629@utils.arg('api_url', metavar='<api_url>', help="URL for the Zone's Auth API")630@utils.arg('--zone_username', metavar='<zone_username>',631 help='Optional Authentication username. (Default=None)',632 default=None)633@utils.arg('--zone_password', metavar='<zone_password>',634 help='Authentication password. (Default=None)',635 default=None)636@utils.arg('--weight_offset', metavar='<weight_offset>',637 help='Child Zone weight offset (Default=0.0))',638 default=0.0)639@utils.arg('--weight_scale', metavar='<weight_scale>',640 help='Child Zone weight scale (Default=1.0).',641 default=1.0)642def do_zone_add(cs, args):643 """Add a new child zone."""644 zone = cs.zones.create(args.zone_name, args.api_url,645 args.zone_username, args.zone_password,646 args.weight_offset, args.weight_scale)647 utils.print_dict(zone._info)648@utils.arg('zone', metavar='<zone>', help='Name or ID of the zone')649def do_zone_delete(cs, args):650 """Delete a zone."""651 cs.zones.delete(args.zone)652def do_zone_list(cs, args):653 """List the children of a zone."""654 utils.print_list(cs.zones.list(), ['ID', 'Name', 'Is Active', \655 'API URL', 'Weight Offset', 'Weight Scale'])656@utils.arg('server', metavar='<server>', help='Name or ID of server.')657@utils.arg('network_id', metavar='<network_id>', help='Network ID.')658def do_add_fixed_ip(cs, args):659 """Add new IP address to network."""660 server = _find_server(cs, args.server)661 server.add_fixed_ip(args.network_id)662@utils.arg('server', metavar='<server>', help='Name or ID of server.')663@utils.arg('address', metavar='<address>', help='IP Address.')664def do_remove_fixed_ip(cs, args):665 """Remove an IP address from a server."""666 server = _find_server(cs, args.server)667 server.remove_fixed_ip(args.address)668def _find_volume(cs, volume):669 """Get a volume by ID."""670 return utils.find_resource(cs.volumes, volume)671def _find_volume_snapshot(cs, snapshot):672 """Get a volume snapshot by ID."""673 return utils.find_resource(cs.volume_snapshots, snapshot)674def _print_volume(cs, volume):675 utils.print_dict(volume._info)676def _print_volume_snapshot(cs, snapshot):677 utils.print_dict(snapshot._info)678def _translate_volume_keys(collection):679 convert = [('displayName', 'display_name')]680 for item in collection:681 keys = item.__dict__.keys()682 for from_key, to_key in convert:683 if from_key in keys and to_key not in keys:684 setattr(item, to_key, item._info[from_key])685def _translate_volume_snapshot_keys(collection):686 convert = [('displayName', 'display_name'), ('volumeId', 'volume_id')]687 for item in collection:688 keys = item.__dict__.keys()689 for from_key, to_key in convert:690 if from_key in keys and to_key not in keys:691 setattr(item, to_key, item._info[from_key])692def do_volume_list(cs, args):693 """List all the volumes."""694 volumes = cs.volumes.list()695 _translate_volume_keys(volumes)696 # Create a list of servers to which the volume is attached697 for vol in volumes:698 servers = [server.get('serverId') for server in vol.attachments]699 setattr(vol, 'attached_to', ','.join(map(str, servers)))700 utils.print_list(volumes, ['ID', 'Status', 'Display Name',701 'Size', 'Attached to'])702@utils.arg('volume', metavar='<volume>', help='ID of the volume.')703def do_volume_show(cs, args):704 """Show details about a volume."""705 volume = _find_volume(cs, args.volume)706 _print_volume(cs, volume)707@utils.arg('size',708 metavar='<size>',709 type=int,710 help='Size of volume in GB')711@utils.arg('--snapshot_id',712 metavar='<snapshot_id>',713 help='Optional snapshot id to create the volume from. (Default=None)',714 default=None)715@utils.arg('--display_name', metavar='<display_name>',716 help='Optional volume name. (Default=None)',717 default=None)718@utils.arg('--display_description', metavar='<display_description>',719 help='Optional volume description. (Default=None)',720 default=None)721def do_volume_create(cs, args):722 """Add a new volume."""723 cs.volumes.create(args.size,724 args.snapshot_id,725 args.display_name,726 args.display_description)727@utils.arg('volume', metavar='<volume>', help='ID of the volume to delete.')728def do_volume_delete(cs, args):729 """Remove a volume."""730 volume = _find_volume(cs, args.volume)731 volume.delete()732@utils.arg('server',733 metavar='<server>',734 help='Name or ID of server.')735@utils.arg('volume',736 metavar='<volume>',737 type=int,738 help='ID of the volume to attach.')739@utils.arg('device', metavar='<device>',740 help='Name of the device e.g. /dev/vdb.')741def do_volume_attach(cs, args):742 """Attach a volume to a server."""743 cs.volumes.create_server_volume(_find_server(cs, args.server).id,744 args.volume,745 args.device)746@utils.arg('server',747 metavar='<server>',748 help='Name or ID of server.')749@utils.arg('attachment_id',750 metavar='<volume>',751 type=int,752 help='Attachment ID of the volume.')753def do_volume_detach(cs, args):754 """Detach a volume from a server."""755 cs.volumes.delete_server_volume(_find_server(cs, args.server).id,756 args.attachment_id)757def do_volume_snapshot_list(cs, args):758 """List all the snapshots."""759 snapshots = cs.volume_snapshots.list()760 _translate_volume_snapshot_keys(snapshots)761 utils.print_list(snapshots, ['ID', 'Volume ID', 'Status', 'Display Name',762 'Size'])763@utils.arg('snapshot', metavar='<snapshot>', help='ID of the snapshot.')764def do_volume_snapshot_show(cs, args):765 """Show details about a snapshot."""766 snapshot = _find_volume_snapshot(cs, args.snapshot)767 _print_volume_snapshot(cs, snapshot)768@utils.arg('volume_id',769 metavar='<volume_id>',770 type=int,771 help='ID of the volume to snapshot')772@utils.arg('--force',773 metavar='<True|False>',774 help='Optional flag to indicate whether to snapshot a volume even if its '775 'attached to an instance. (Default=False)',776 default=False)777@utils.arg('--display_name', metavar='<display_name>',778 help='Optional snapshot name. (Default=None)',779 default=None)780@utils.arg('--display_description', metavar='<display_description>',781 help='Optional snapshot description. (Default=None)',782 default=None)783def do_volume_snapshot_create(cs, args):784 """Add a new snapshot."""785 cs.volume_snapshots.create(args.volume_id,786 args.force,787 args.display_name,788 args.display_description)789@utils.arg('snapshot_id',790 metavar='<snapshot_id>',791 help='ID of the snapshot to delete.')792def do_volume_snapshot_delete(cs, args):793 """Remove a snapshot."""794 snapshot = _find_volume_snapshot(cs, args.snapshot_id)795 snapshot.delete()796def _print_floating_ip_list(floating_ips):797 utils.print_list(floating_ips, ['Ip', 'Instance Id', 'Fixed Ip'])798@utils.arg('server', metavar='<server>', help='Name or ID of server.')799@utils.arg('address', metavar='<address>', help='IP Address.')800def do_add_floating_ip(cs, args):801 """Add a floating IP address to a server."""802 server = _find_server(cs, args.server)803 server.add_floating_ip(args.address)804@utils.arg('server', metavar='<server>', help='Name or ID of server.')805@utils.arg('address', metavar='<address>', help='IP Address.')806def do_remove_floating_ip(cs, args):807 """Remove a floating IP address from a server."""808 server = _find_server(cs, args.server)809 server.remove_floating_ip(args.address)810def do_floating_ip_create(cs, args):811 """Allocate a floating IP for the current tenant."""812 _print_floating_ip_list([cs.floating_ips.create()])813@utils.arg('address', metavar='<address>', help='IP of Floating Ip.')814def do_floating_ip_delete(cs, args):815 """De-allocate a floating IP."""816 floating_ips = cs.floating_ips.list()817 for floating_ip in floating_ips:818 if floating_ip.ip == args.address:819 return cs.floating_ips.delete(floating_ip.id)820 raise exceptions.CommandError("Floating ip %s not found.", args.address)821def do_floating_ip_list(cs, args):822 """List floating ips for this tenant."""823 _print_floating_ip_list(cs.floating_ips.list())824def _print_dns_list(dns_entries):825 utils.print_list(dns_entries, ['ip', 'zone', 'name'])826def do_dns_zones(cs, args):827 """Print a list of available dns zones."""828 zones = cs.floating_ip_dns.zones()829 utils.print_list(zones, ['zone'])830@utils.arg('zone', metavar='<zone>', help='DNS zone')831@utils.arg('--ip', metavar='<ip>', help='ip address', default=None)832@utils.arg('--name', metavar='<name>', help='DNS name', default=None)833def do_dns_list(cs, args):834 """List current DNS entries for zone and ip or zone and name."""835 if not (args.ip or args.name):836 raise exceptions.CommandError(837 "You must specify either --ip or --name")838 entries = cs.floating_ip_dns.get_entries(args.zone,839 ip=args.ip, name=args.name)840 _print_dns_list(entries)841@utils.arg('zone', metavar='<zone>', help='DNS zone')842@utils.arg('name', metavar='<name>', help='DNS name')843@utils.arg('ip', metavar='<ip>', help='ip address')844@utils.arg('--type', metavar='<type>', help='dns type (e.g. "A")',845 default='A')846def do_dns_create(cs, args):847 """Create a DNS entry for zone, name and ip."""848 entries = cs.floating_ip_dns.create_entry(args.zone, args.name,849 args.ip, args.type)850 _print_dns_list([entries])851@utils.arg('zone', metavar='<zone>', help='DNS zone')852@utils.arg('name', metavar='<name>', help='DNS name')853def do_dns_delete(cs, args):854 """Delete the specified DNS entry."""855 cs.floating_ip_dns.delete_entry(args.zone, args.name)856def _print_secgroup_rules(rules):857 class FormattedRule:858 def __init__(self, obj):859 items = (obj if isinstance(obj, dict) else obj._info).items()860 for k, v in items:861 if k == 'ip_range':862 v = v.get('cidr')863 elif k == 'group':864 k = 'source_group'865 v = v.get('name')866 if v is None:867 v = ''868 setattr(self, k, v)869 rules = [FormattedRule(rule) for rule in rules]870 utils.print_list(rules, ['IP Protocol', 'From Port', 'To Port',871 'IP Range', 'Source Group'])872def _print_secgroups(secgroups):873 utils.print_list(secgroups, ['Name', 'Description'])874def _get_secgroup(cs, secgroup):875 for s in cs.security_groups.list():876 if secgroup == s.name:877 return s878 raise exceptions.CommandError("Secgroup %s not found" % secgroup)879@utils.arg('secgroup', metavar='<secgroup>', help='ID of security group.')880@utils.arg('ip_proto', metavar='<ip_proto>', help='ip_proto (icmp, tcp, udp).')881@utils.arg('from_port', metavar='<from_port>', help='Port at start of range.')882@utils.arg('to_port', metavar='<to_port>', help='Port at end of range.')883@utils.arg('cidr', metavar='<cidr>', help='CIDR for address range.')884def do_secgroup_add_rule(cs, args):885 """Add a rule to a security group."""886 secgroup = _get_secgroup(cs, args.secgroup)887 rule = cs.security_group_rules.create(secgroup.id,888 args.ip_proto,889 args.from_port,890 args.to_port,891 args.cidr)892 _print_secgroup_rules([rule])893@utils.arg('secgroup', metavar='<secgroup>', help='ID of security group.')894@utils.arg('ip_proto', metavar='<ip_proto>', help='ip_proto (icmp, tcp, udp).')895@utils.arg('from_port', metavar='<from_port>', help='Port at start of range.')896@utils.arg('to_port', metavar='<to_port>', help='Port at end of range.')897@utils.arg('cidr', metavar='<cidr>', help='CIDR for address range.')898def do_secgroup_delete_rule(cs, args):899 """Delete a rule from a security group."""900 secgroup = _get_secgroup(cs, args.secgroup)901 for rule in secgroup.rules:902 if (rule['ip_protocol'] == args.ip_proto and903 rule['from_port'] == int(args.from_port) and904 rule['to_port'] == int(args.to_port) and905 rule['ip_range']['cidr'] == args.cidr):906 return cs.security_group_rules.delete(rule['id'])907 raise exceptions.CommandError("Rule not found")908@utils.arg('name', metavar='<name>', help='Name of security group.')909@utils.arg('description', metavar='<description>',910 help='Description of security group.')911def do_secgroup_create(cs, args):912 """Create a security group."""913 _print_secgroups([cs.security_groups.create(args.name, args.description)])914@utils.arg('secgroup', metavar='<secgroup>', help='Name of security group.')915def do_secgroup_delete(cs, args):916 """Delete a security group."""917 cs.security_groups.delete(_get_secgroup(cs, args.secgroup))918def do_secgroup_list(cs, args):919 """List security groups for the curent tenant."""920 _print_secgroups(cs.security_groups.list())921@utils.arg('secgroup', metavar='<secgroup>', help='Name of security group.')922def do_secgroup_list_rules(cs, args):923 """List rules for a security group."""924 secgroup = _get_secgroup(cs, args.secgroup)925 _print_secgroup_rules(secgroup.rules)926@utils.arg('secgroup', metavar='<secgroup>', help='ID of security group.')927@utils.arg('source_group', metavar='<source_group>',928 help='ID of source group.')929@utils.arg('--ip_proto', metavar='<ip_proto>',930 help='ip_proto (icmp, tcp, udp).')931@utils.arg('--from_port', metavar='<from_port>',932 help='Port at start of range.')933@utils.arg('--to_port', metavar='<to_port>', help='Port at end of range.')934def do_secgroup_add_group_rule(cs, args):935 """Add a source group rule to a security group."""936 secgroup = _get_secgroup(cs, args.secgroup)937 source_group = _get_secgroup(cs, args.source_group)938 params = {}939 params['group_id'] = source_group.id940 if args.ip_proto or args.from_port or args.to_port:941 if not (args.ip_proto and args.from_port and args.to_port):942 raise exceptions.CommandError("ip_proto, from_port, and to_port"943 " must be specified together")944 params['ip_protocol'] = args.ip_proto945 params['from_port'] = args.from_port946 params['to_port'] = args.to_port947 rule = cs.security_group_rules.create(secgroup.id, **params)948 _print_secgroup_rules([rule])949@utils.arg('secgroup', metavar='<secgroup>', help='ID of security group.')950@utils.arg('source_group', metavar='<source_group>',951 help='ID of source group.')952@utils.arg('--ip_proto', metavar='<ip_proto>',953 help='ip_proto (icmp, tcp, udp).')954@utils.arg('--from_port', metavar='<from_port>',955 help='Port at start of range.')956@utils.arg('--to_port', metavar='<to_port>', help='Port at end of range.')957def do_secgroup_delete_group_rule(cs, args):958 """Delete a source group rule from a security group."""959 secgroup = _get_secgroup(cs, args.secgroup)960 source_group = _get_secgroup(cs, args.source_group)961 params = {}962 params['group_name'] = source_group.name963 if args.ip_proto or args.from_port or args.to_port:964 if not (args.ip_proto and args.from_port and args.to_port):965 raise exceptions.CommandError("ip_proto, from_port, and to_port"966 " must be specified together")967 params['ip_protocol'] = args.ip_proto968 params['from_port'] = int(args.from_port)969 params['to_port'] = int(args.to_port)970 for rule in secgroup.rules:971 if (rule.get('ip_protocol') == params.get('ip_protocol') and972 rule.get('from_port') == params.get('from_port') and973 rule.get('to_port') == params.get('to_port') and974 rule.get('group', {}).get('name') ==\975 params.get('group_name')):976 return cs.security_group_rules.delete(rule['id'])977 raise exceptions.CommandError("Rule not found")978@utils.arg('name', metavar='<name>', help='Name of key.')979@utils.arg('--pub_key', metavar='<pub_key>', help='Path to a public ssh key.',980 default=None)981def do_keypair_add(cs, args):982 """Create a new key pair for use with instances"""983 name = args.name984 pub_key = args.pub_key985 if pub_key:986 try:987 with open(pub_key) as f:988 pub_key = f.read()989 except IOError, e:990 raise exceptions.CommandError("Can't open or read '%s': %s" % \991 (pub_key, e))992 keypair = cs.keypairs.create(name, pub_key)993 if not pub_key:994 private_key = keypair.private_key995 print private_key996@utils.arg('name', metavar='<name>', help='Keypair name to delete.')997def do_keypair_delete(cs, args):998 """Delete keypair by its id"""999 name = args.name1000 cs.keypairs.delete(name)1001def do_keypair_list(cs, args):1002 """Print a list of keypairs for a user"""1003 keypairs = cs.keypairs.list()1004 columns = ['Name', 'Fingerprint']1005 utils.print_list(keypairs, columns)1006def do_absolute_limits(cs, args):1007 """Print a list of absolute limits for a user"""1008 limits = cs.limits.get().absolute1009 columns = ['Name', 'Value']1010 utils.print_list(limits, columns)1011def do_rate_limits(cs, args):1012 """Print a list of rate limits for a user"""1013 limits = cs.limits.get().rate1014 columns = ['Verb', 'URI', 'Value', 'Remain', 'Unit', 'Next_Available']...

Full Screen

Full Screen

trape.py

Source:trape.py Github

copy

Full Screen

1#!/usr/bin/env python2# -*- coding: utf-8 -*-3#**4#5#########6# trape #7#########8#9# trape depends of this file10# For full copyright information this visit: https://github.com/jofpin/trape11#12# Copyright 2018 by Jose Pino (@jofpin) / <jofpin@gmail.com>13#**14import time15import json16import urllib17from core.dependence import urllib218import http.client 19import argparse20import socket21import sys22import os23from core.utils import utils24import subprocess25import requests26import hashlib, binascii27from threading import Timer28from multiprocessing import Process29import atexit30class Trape(object):31 def __init__(self, stat = 0):32 self.name_trape = "Trape"33 self.version = "2.1"34 self.stats_path = "ngrok"35 self.home_path = utils.generateToken(18)36 self.logout_path = utils.generateToken(6)37 self.remove_path = utils.generateToken(14)38 self.injectURL = utils.generateToken(12) + '.js'39 self.stats_key = utils.generateToken(24)40 self.date_start = time.strftime("%Y-%m-%d - %H:%M:%S")41 self.stat = stat42 self.localIp = '127.0.0.1'43 self.nGrokUrl = ''44 self.JSFiles = ({"path" : "base.js", "src" : utils.generateToken(12)},{"path" : "libs.min.js", "src" : utils.generateToken(12)},{"path" : "login.js", "src" : utils.generateToken(12)},{"path" : "payload.js", "src" : utils.generateToken(12)},{"path" : "trape.js", "src" : utils.generateToken(12)},{"path" : "vscript.js", "src" : utils.generateToken(12)},{"path" : "custom.js", "src" : utils.generateToken(12)},)45 self.CSSFiles = ({"path" : "/static/img/favicon.ico", "src" : utils.generateToken(12)},{"path" : "/static/img/favicon.png", "src" : utils.generateToken(12)},{"path" : "/static/css/base-icons.css", "src" : utils.generateToken(12)},{"path" : "/static/css/styles.css", "src" : utils.generateToken(12)},{"path" : "/static/css/normalize.min.css", "src" : utils.generateToken(12)},{"path": "/static/css/services-icons.css", "src" : utils.generateToken(12)},)46 if self.stat == 1:47 c = http.client.HTTPConnection('www.google.com', timeout=5)48 try:49 c.request("HEAD", "/")50 c.close()51 except Exception as e:52 c.close()53 utils.Go("\033[H\033[J")54 utils.Go(utils.Color['whiteBold'] + "[" + utils.Color['redBold'] + "x" + utils.Color['whiteBold'] + "]" + utils.Color['redBold'] + " " + "NOTICE: " + utils.Color['white'] + "Trape needs Internet connection for working" + "\n\t")55 sys.exit(0)56 if (not(os.path.exists("trape.config"))):57 self.trape_config()58 try:59 config_trape = json.load(open("trape.config"))60 except Exception as error:61 os.remove('trape.config')62 self.trape_config()63 self.ngrok = config_trape['ngrok_token']64 self.gmaps = config_trape['gmaps_api_key']65 self.ipinfo = config_trape['ipinfo_api_key']66 if self.gmaps == '':67 self.gmaps = 'AIzaSyA30wEa2DwUuddmNTHvoprhnrB2w_aCWbs'68 self.googl = config_trape['gshortener_api_key']69 if self.googl == '':70 self.googl = 'AIzaSyDHMDTOGo9L1OBl5vRxOVM6vpXOXVp5jCc'71 72 parser = argparse.ArgumentParser("python3 trape.py -u <<Url>> -p <<Port>>")73 parser.add_argument('-u', '--url', dest='url', help='Put the web page url to clone')74 parser.add_argument('-p', '--port', dest='port', help='Insert your port')75 parser.add_argument('-ak', '--accesskey', dest='accesskey', help='Insert your custom key access')76 parser.add_argument('-l', '--local', dest='local', help='Insert your home file')77 parser.add_argument('-n', '--ngrok', dest='ngrok', help='Insert your ngrok Authtoken', action='store_true')78 parser.add_argument('-ic', '--injectcode', dest='injc', help='Insert your custom REST API path')79 parser.add_argument('-ud', '--update', dest='update', action='store_true', default=False, help='Update trape to the latest version')80 options = parser.parse_args()81 self.type_lure = 'global'82 # Check current updates83 if options.update:84 utils.Go("\033[H\033[J")85 utils.Go("Updating..." + " " + utils.Color['blue'] + "trape" + utils.Color['white'] + "..." + "\n")86 subprocess.check_output(["git", "reset", "--hard", "origin/master"])87 subprocess.check_output(["git", "pull"])88 utils.Go("Trape Updated... Please execute again...")89 sys.exit(0)90 if options.url is None:91 utils.Go("\033[H\033[J")92 utils.Go("----------------------------------------------")93 utils.Go("" + " " + utils.Color['redBold'] + "TRAPE" + utils.Color['white'] +" {" + utils.Color['yellowBold'] + "stable" + utils.Color['white'] + "}" + utils.Color['white'] + " - " + "Osint and analytics tool" + " " + "<" +utils.Color['white'])94 utils.Go("----------------------------------------------")95 utils.Go("| v" + utils.Color['redBold'] + self.version + utils.Color['white'] + " |") 96 utils.Go("--------" + "\n")97 utils.Go(utils.Color['whiteBold'] + "[" + utils.Color['greenBold'] + "!" + utils.Color['whiteBold'] + "]" + " " + utils.Color['white'] + "Enter the information requested below to complete the execution" + utils.Color['white'])98 utils.Go("")99 options.url = input(utils.Color['blueBold'] + "-" + utils.Color['white'] + " Enter a URL to generate the lure" + " " + utils.Color['yellow'] + ":~> " + utils.Color['white'])100 if options.port is None:101 options.port = input(utils.Color['blueBold'] + "-" + utils.Color['white'] + " What is your port to generate the server?" + " " + utils.Color['yellow'] + ":~> " + utils.Color['white'])102 while utils.checkPort(int(options.port)) == False:103 utils.Go("\033[H\033[J")104 utils.Go("----------------------------------------------")105 utils.Go("" + " " + utils.Color['redBold'] + "TRAPE" + utils.Color['white'] +" {" + utils.Color['yellowBold'] + "stable" + utils.Color['white'] + "}" + utils.Color['white'] + " - " + "Osint and analytics tool" + " " + "<" +utils.Color['white'])106 utils.Go("----------------------------------------------")107 utils.Go("\n")108 utils.Go(utils.Color['whiteBold'] + "[" + utils.Color['redBold'] + "x" + utils.Color['whiteBold'] + "]" + utils.Color['redBold'] + " " + "ERROR:" + " " + utils.Color['whiteBold'] + "The port: " + options.port + utils.Color['white'] + " " + "is not available, It was previously used (" + utils.Color['yellow'] + "Use another port" + utils.Text['end'] + ")" + "\n\n")109 options.port = input(utils.Color['blueBold'] + "-" + utils.Color['white'] + " What is your port to generate the server?" + " " + utils.Color['yellow'] + ":~> " + utils.Color['white'])110 #while utils.checkUrl(str(options.url)) == False:111 options.url = input(utils.Color['blueBold'] + "-" + utils.Color['white'] + " Enter a URL to generate the lure" + " " + utils.Color['yellow'] + ":~> " + utils.Color['white'])112 utils.Go("")113 utils.Go(utils.Color['greenBold'] + "-" + utils.Color['white'] + " Successful " + utils.Color['greenBold'] + "startup" + utils.Color['white'] + ", get lucky on the way!" + utils.Color['white'])114 utils.Go("")115 time.sleep(0.1)116 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)117 s.connect(("8.8.8.8", 80))118 self.localIp = s.getsockname()[0]119 self.app_port = int(options.port)120 self.url_to_clone = str(options.url)121 if self.url_to_clone[0:4] != 'http':122 self.url_to_clone = 'http://' + self.url_to_clone123 self.victim_path = options.url.replace("http://", "").replace("https://", "")124 if (options.ngrok or (self.ngrok != "")):125 if self.ngrok == '':126 utils.Go("\033[H\033[J")127 self.ngrok = input("What is your nGrok token?" + " " + utils.Color['yellow'] + ":~> " + utils.Color['white'])128 if (self.ngrok != ''):129 from core.ngrok import ngrok130 import os.path as path131 v_ngrok = ngrok(self.ngrok, self.app_port, stat, self.stats_path)132 else:133 utils.Go(utils.Color['whiteBold'] + "[" + utils.Color['redBold'] + "x" + utils.Color['whiteBold'] + "]" + utils.Color['redBold'] + " " + "ERROR: " + " " + utils.Color['white'] + "Your nGrok authtoken can't be empty")134 135 # Custom name of REST API136 if (options.injc):137 self.injectURL = options.injc138 # Custom access token 139 if (options.accesskey):140 self.stats_key = options.accesskey141 # Design principal of the header of trape142 def header(self):143 if self.stat == 1:144 # Principal header of tool145 utils.banner()146 # Update verification147 changeLog = requests.get("https://raw.githubusercontent.com/jofpin/trape/master/version.txt", timeout = 4)148 changeLog = changeLog.text.split(" ")[1]149 changeLog = changeLog.strip()150 if changeLog != self.version:151 utils.Go(utils.Color['white'] + "\t" + utils.Color['yellowBold'] + "@" + utils.Color['white'] + "-" + utils.Color['blue'] + "=" + utils.Color['white'] + "[" + utils.Color['whiteBold'] + " " + "UPDATES:" + " " + utils.Color['yellowBold'] + "NEW VERSION IS AVAILABLE: " + utils.Color['white'] + "v" + utils.Color['redBold'] + changeLog + utils.Color['white'] + " " + "(install changes)")152 utils.Go("")153 else:154 utils.Go(utils.Color['white'] + "\t" + utils.Color['yellowBold'] + "@" + utils.Color['white'] + "-" + utils.Color['blue'] + "=" + utils.Color['white'] + "[" + utils.Color['whiteBold'] + " " + "UPDATES:" + " " + utils.Color['greenBold'] + "RUNNING RECENT VERSION" + utils.Color['white'])155 utils.Go("")156 # Local information vars 157 utils.Go(utils.Color['white'] + "\t" + utils.Color['whiteBold'] + "LOCAL INFORMATION" + utils.Text['end'])158 utils.Go("\t" + "-------------------")159 utils.Go(utils.Color['white'] + "\t" + utils.Color['green'] + ">" + utils.Color['white'] + "-" + utils.Color['blue'] + "=" + utils.Color['white'] + "[" + utils.Color['white'] + " Lure for the users: " + utils.Color['blue'] + 'http://' + self.localIp + ':' + str(self.app_port) + '/' + self.victim_path)160 utils.Go(utils.Color['white'] + "\t" + utils.Color['green'] + ">" + utils.Color['white'] + "-" + utils.Color['blue'] + "=" + utils.Color['white'] + "[" + utils.Color['white'] + " Your REST API path: " + utils.Color['blue'] + 'http://' + self.localIp + ':' + str(self.app_port) + '/' + self.injectURL + utils.Color['white'])161 utils.Go(utils.Color['white'] + "\t" + utils.Color['green'] + ">" + utils.Color['white'] + "-" + utils.Color['blue'] + "=" + utils.Color['white'] + "[" + utils.Color['white'] + " Control Panel Link: " + utils.Color['blue'] + "http://127.0.0.1:" + utils.Color['blue'] + str(self.app_port) + '/' + self.stats_path)162 utils.Go(utils.Color['white'] + "\t" + utils.Color['green'] + ">" + utils.Color['white'] + "-" + utils.Color['blue'] + "=" + utils.Color['white'] + "[" + utils.Color['white'] + " Your Access key: " + utils.Color['blue'] + self.stats_key + utils.Color['white'])163 utils.Go("")164 if self.ngrok != '':165 if self.googl == '':166 self.googl = 'AIzaSyCPzcppCT27KTHnxAIQvYhtvB_l8sKGYBs'167 try:168 opener = urllib.request.build_opener()169 pLog = 4040170 ngrokStatus = str(opener.open('http://127.0.0.1:' + str(pLog) + '/api/tunnels').read()).replace('\n', '').replace(' ', '')171 time.sleep(0.5)172 ngrokUrlPos = ngrokStatus.find('ngrok.io')173 if ngrokUrlPos <= 0:174 time.sleep(4)175 ngrokStatus = str(opener.open('http://127.0.0.1:' + str(pLog) + '/api/tunnels').read()).replace('\n', '').replace(' ', '')176 ngrokUrlPos = ngrokStatus.find('ngrok.io')177 if ngrokUrlPos >= 0:178 ngrokStatus = ngrokStatus[ngrokUrlPos-25:ngrokUrlPos+28]179 ngrokUrlPos = ngrokStatus.find('http')180 ngrokUrlPos2 = ngrokStatus.find('.io')181 ngrokStatus = ngrokStatus[ngrokUrlPos: ngrokUrlPos2] + '.io'182 utils.Go(utils.Color['white'] + "\t" + utils.Color['whiteBold'] + "PUBLIC INFORMATION" + utils.Text['end'])183 utils.Go("\t" + "-------------------")184 r = utils.gShortener(self.googl, ngrokStatus.replace('https', 'http') + '/' + self.victim_path)185 self.nGrokUrl = ngrokStatus.replace('https', 'http')186 utils.Go(utils.Color['white'] + "\t" + utils.Color['yellow'] + ">" + utils.Color['white'] + "-" + utils.Color['blue'] + "=" + utils.Color['white'] + "[" + utils.Color['white'] + " Public lure: " + utils.Color['blue'] + self.nGrokUrl + '/' + self.victim_path + utils.Color['white'])187 utils.Go(utils.Color['white'] + "\t" + utils.Color['yellow'] + ">" + utils.Color['white'] + "-" + utils.Color['blue'] + "=" + utils.Color['white'] + "[" + utils.Color['white'] + " Control Panel link: " + utils.Color['blue'] + ngrokStatus.replace('https', 'http') + '/' + self.stats_path + utils.Color['white'])188 else:189 utils.Go(utils.Color['red'] + "\t" + utils.Color['green'] + "-" + utils.Color['white'] + "--" + utils.Color['red'] + "=" + utils.Color['white'] + "[" + utils.Color['white'] + " We can't connect with nGrok " + utils.Color['white'])190 except Exception as e:191 utils.Go(utils.Color['white'] + "[" + utils.Color['redBold'] + "x" + utils.Color['whiteBold'] + "]" + utils.Color['redBold'] + " " + "ERROR: " + " " + utils.Color['white'] + e.message)192 utils.Go(utils.Color['red'] + "\t" + utils.Color['green'] + "-" + utils.Color['white'] + "--" + utils.Color['red'] + "=" + utils.Color['white'] + "[" + utils.Color['white'] + " We can't connect with nGrok " + utils.Color['white'])193 utils.Go("\n" + utils.Color['white'])194 utils.Go(utils.Color['white'] + "[" + utils.Color['greenBold'] + ">" + utils.Color['white'] + "]" + utils.Color['whiteBold'] + " " + "Start time:" + " " + utils.Color['white'] + self.date_start)195 utils.Go(utils.Color['white'] + "[" + utils.Color['greenBold'] + "?" + utils.Color['white'] + "]" + utils.Color['white'] + " " + "Do not forget to close " + self.name_trape + ", after use. Press Control C" + " " + utils.Color['white'] + '\n')196 utils.Go(utils.Color['white'] + "[" + utils.Color['greenBold'] + "¡" + utils.Color['white'] + "]" + utils.Color['white'] + " " + "Waiting for the users to fall..." + "\n")197 # Important: in the process of use is possible that will ask for the root198 def rootConnection(self):199 pass200 # Detect operating system, to compose the compatibility 201 def loadCheck(self):202 utils.checkOS()203 204 # the main file (trape.py)205 def main(self):206 import core.sockets207 # Create config file208 def trape_config(self):209 utils.Go("\033[H\033[J")210 utils.Go("----------------------------------------------------------")211 utils.Go("" + " " + utils.Color['redBold'] + "TRAPE" + utils.Color['white'] +" {" + utils.Color['yellowBold'] + "stable" + utils.Color['white'] + "}" + utils.Color['white'] + " - " + "Configuration zone to use the software" + " " + "<" + utils.Color['white'])212 utils.Go("----------------------------------------------------------")213 utils.Go("| v" + utils.Color['redBold'] + self.version + utils.Color['white'] + " |") 214 utils.Go("--------" + "\n")215 utils.Go(utils.Color['whiteBold'] + "GENERAL CONFIG" + utils.Color['white'])216 utils.Go("------")217 utils.Go("Through this section you will configure the resources required \nfor an effective function of trape, please complete the following steps, below. \nKeep in mind that if the data is incorrect this tool will not work." + utils.Color['white'])218 utils.Go("")219 utils.Go(utils.Color['whiteBold'] + "NGROK TOKEN" + utils.Color['white'])220 utils.Go("------")221 utils.Go("In the next section you must enter your Ngrok token, if you do not have \none register at (" + utils.Color['blueBold'] + "https://ngrok.com" + utils.Color['white'] + "), this data is necessary for the generation of public network tunnels.")222 utils.Go("")223 c_nGrokToken = input(utils.Color['blueBold'] + "-" + utils.Color['white'] + " Enter your ngrok token" + " " + utils.Color['yellow'] + ":~> " + utils.Color['white'])224 utils.Go("")225 utils.Go(utils.Color['whiteBold'] + "GOOGLE API" + utils.Color['white'])226 utils.Go("------")227 utils.Go("You must register with the " + utils.Color['blueBold'] + "Google Console" + utils.Color['white'] + ", and get an API for maps and another for shortening. \nBy having these data you complete the settings")228 utils.Go("")229 c_gMapsToken = input(utils.Color['blueBold'] + "-" + utils.Color['white'] + " What is your Google Maps Api Key?" + " " + utils.Color['yellow'] + ":~> " + utils.Color['white'])230 c_gOoglToken = input(utils.Color['blueBold'] + "-" + utils.Color['white'] + " Enter your Goo.gl (shortener) Api Key (leave it empty if you don't have)" + " " + utils.Color['yellow'] + ":~> " + utils.Color['white'])231 utils.Go("")232 utils.Go(utils.Color['whiteBold'] + "IP INFO API" + utils.Color['white'])233 utils.Go("------")234 utils.Go("You must register with the " + utils.Color['blueBold'] + "https://ipgeolocation.io" + utils.Color['white'] + ", and get an API for geolocation. \nBy having these data you complete the settings")235 utils.Go("")236 c_ipinfo = input(utils.Color['blueBold'] + "-" + utils.Color['white'] + " What is your IP Info Api Key?" + " " + utils.Color['yellow'] + ":~> " + utils.Color['white'])237 utils.Go("")238 utils.Go(utils.Color['greenBold'] + "-" + utils.Color['white'] + " Congratulations! " + utils.Color['greenBold'] + "Successful configuration" + utils.Color['white'] + ", now enjoy Trape!" + utils.Color['white'])239 utils.Go("")240 time.sleep(0.4)241 if (c_nGrokToken != '' and c_gMapsToken != ''):242 v = '{\n\t"ngrok_token" : "' + c_nGrokToken + '",\n\t"gmaps_api_key" : "' + c_gMapsToken + '",\n\t"gshortener_api_key" : "' + c_gOoglToken + '",\n\t"ipinfo_api_key" : "' + c_ipinfo + '"\n}'243 f = open ('trape.config', 'w')244 f.write(v)245 f.close()246 else:247 self.trape_config()248 def injectCSS_Paths(self, code):249 code = code.replace("[FAVICON_HREF]", self.CSSFiles[0]['src'])250 code = code.replace("[FAVICON_PNG_HREF]",self.CSSFiles[1]['src'])251 code = code.replace("[BASE_ICONS_HREF]", self.CSSFiles[2]['src'])252 code = code.replace("[STYLES_HREF]", self.CSSFiles[3]['src'])253 code = code.replace("[NORMALIZE_HREF]", self.CSSFiles[4]['src'])254 code = code.replace("[SERVICES_ICONS_HREF]", self.CSSFiles[5]['src'])255 return code256# Autocompletion of console257if "nt" in os.name:258 pass259else:260 import readline261 readline.parse_and_bind("tab:complete")...

Full Screen

Full Screen

test_utils.py

Source:test_utils.py Github

copy

Full Screen

1from astropy.tests.helper import pytest2import numpy as np3import stingray.utils as utils4from scipy.stats import sem5np.random.seed(20150907)6class TestRebinData(object):7 @classmethod8 def setup_class(cls):9 cls.dx = 1.010 cls.n = 1011 cls.counts = 2.012 cls.x = np.arange(cls.dx / 2, cls.dx / 2 + cls.n * cls.dx, cls.dx)13 cls.y = np.zeros_like(cls.x) + cls.counts14 cls.yerr = np.sqrt(cls.y)15 def test_new_stepsize(self):16 dx_new = 2.017 xbin, ybin, yerr, step_size = utils.rebin_data(self.x, self.y, dx_new,18 self.yerr)19 assert np.allclose(step_size, dx_new / self.dx)20 def test_arrays(self):21 dx_new = 2.022 xbin, ybin, yerr, step_size = utils.rebin_data(self.x, self.y, dx_new,23 self.yerr)24 assert isinstance(xbin, np.ndarray)25 assert isinstance(ybin, np.ndarray)26 def test_length_matches(self):27 dx_new = 2.028 xbin, ybin, yerr, step_size = utils.rebin_data(self.x, self.y, dx_new,29 self.yerr)30 assert xbin.shape[0] == ybin.shape[0]31 def test_binned_counts(self):32 dx_new = 2.033 xbin, ybin, yerr, step_size = utils.rebin_data(self.x, self.y, dx_new,34 self.yerr)35 ybin_test = np.zeros_like(xbin) + self.counts * dx_new / self.dx36 assert np.allclose(ybin, ybin_test)37 def test_uneven_bins(self):38 dx_new = 1.539 xbin, ybin, yerr, step_size = utils.rebin_data(self.x, self.y, dx_new,40 self.yerr)41 assert np.isclose(xbin[1] - xbin[0], dx_new)42 def test_uneven_binned_counts(self):43 dx_new = 1.544 xbin, ybin, yerr, step_size = utils.rebin_data(self.x, self.y, dx_new,45 self.yerr)46 ybin_test = np.zeros_like(xbin) + self.counts * dx_new / self.dx47 assert np.allclose(ybin_test, ybin)48 def test_rebin_data_should_raise_error_when_method_is_not_allowed(self):49 dx_new = 2.050 with pytest.raises(ValueError):51 utils.rebin_data(self.x, self.y, dx_new, self.yerr,52 method='not_allowed_method')53 def test_rebin_variable_input_sampling(self):54 x1 = np.linspace(0,10,11)55 x2 = np.linspace(10.33, 20.0, 30)56 x3 = np.linspace(21, 30, 10 )57 x = np.hstack([x1, x2, x3])58 counts = 2.059 y = np.zeros_like(x) + counts60 yerr = np.sqrt(y)61 dx_new = 1.562 xbin, ybin, yerr_bin, step_size = utils.rebin_data(x, y, dx_new, yerr)63 assert np.allclose(ybin, counts * step_size)64 assert len(xbin) == 2065 def test_rebin_variable_input_mean(self):66 x1 = np.linspace(0,10,11)67 x2 = np.linspace(10.33, 20.0, 30)68 x3 = np.linspace(21, 30, 10 )69 x = np.hstack([x1, x2, x3])70 counts = 2.071 y = np.zeros_like(x) + counts72 yerr = np.sqrt(y)73 dx_new = 1.574 xbin, ybin, yerr_bin, step_size = utils.rebin_data(x, y, dx_new, yerr, method="average")75 assert len(xbin) == 2076 assert np.allclose(ybin, counts)77class TestRebinDataLog(object):78 @classmethod79 def setup_class(cls):80 cls.dx = 181 cls.xmax = 2182 cls.xmin = 183 cls.x = np.arange(cls.xmin, cls.xmax, cls.dx)84 cls.y = np.arange(cls.xmin, cls.xmax, cls.dx)85 cls.y_err = np.ones_like(cls.y)86 cls.true_bins = np.array(87 [1., 1.1, 1.21, 1.331, 1.4641, 1.61051, 1.771561,88 1.9487171, 2.14358881, 2.35794769, 2.59374246, 2.85311671])89 cls.true_bin_edges = np.array(90 [0.5, 1.5, 2.6000000000000001, 3.81, 5.141, 6.6051, 8.21561,91 9.987171, 11.9358881, 14.07947691, 16.437424601, 19.0311670611,92 21.88428376721])93 cls.true_values = np.array(94 [1., 2., 3., 4.5, 6., 7.5, 9., 10.5, 13., 15.5, 18., 20.])95 cls.true_nsamples = np.array([1, 1, 1, 2, 1, 2, 1, 2, 3, 2, 3, 1])96 cls.f = 0.197 def test_rebin_data_log_runs(self):98 _, _, _, _ = utils.rebin_data_log(self.x, self.y, self.f,99 y_err=self.y_err, dx=self.dx)100 def test_method_fails_if_x_and_y_of_unequal_length(self):101 with pytest.raises(ValueError):102 _, _, _, _ = utils.rebin_data_log(self.x[1:], self.y, self.f,103 y_err=self.y_err, dx=self.dx)104 def test_method_fails_if_y_and_yerr_of_unequal_length(self):105 with pytest.raises(ValueError):106 _, _, _, _ = utils.rebin_data_log(self.x, self.y, self.f,107 y_err=self.y_err[1:], dx=self.dx)108 def test_all_outputs_have_the_same_dimension_except_binx(self):109 binx, biny, binyerr, nsamples = utils.rebin_data_log(self.x, self.y,110 self.f,111 y_err=self.y_err,112 dx=self.dx)113 # binx describes the bin _edges_ rather than midpoints, so has one114 # more entry than biny and the rest115 assert binx.shape[0] == biny.shape[0] + 1116 assert biny.shape[0] == binyerr.shape[0]117 assert binyerr.shape[0] == nsamples.shape[0]118 def test_binning_works_correctly(self):119 binx, _, _, _ = utils.rebin_data_log(self.x, self.y, self.f,120 y_err=self.y_err, dx=self.dx)121 assert np.allclose(np.diff(binx), self.true_bins)122 def test_bin_edges_are_correct(self):123 binx, _, _, _ = utils.rebin_data_log(self.x, self.y, self.f,124 y_err=self.y_err, dx=self.dx)125 assert np.allclose(binx, self.true_bin_edges)126 def test_bin_values_are_correct(self):127 _, biny, _, _ = utils.rebin_data_log(self.x, self.y, self.f,128 y_err=self.y_err, dx=self.dx)129 assert np.allclose(biny, self.true_values)130 def test_nsamples_are_correctly_calculated(self):131 _, _, _, nsamples = utils.rebin_data_log(self.x, self.y, self.f,132 y_err=self.y_err, dx=self.dx)133 assert np.allclose(nsamples, self.true_nsamples)134 def test_method_works_on_complex_numbers(self):135 re = np.arange(self.xmin, self.xmax, self.dx)136 im = np.arange(self.xmin, self.xmax, self.dx)137 y = np.zeros(re.shape[0], dtype=np.complex64)138 yerr = np.zeros(re.shape[0], dtype=complex)139 for k, (r, i) in enumerate(zip(re, im)):140 y[k] = r + i * 1j141 yerr[k] = r + i * 1j142 real_binned = np.zeros(self.true_values.shape[0], dtype=complex)143 for i in range(self.true_values.shape[0]):144 real_binned[i] = self.true_values[i] + self.true_values[i] * 1j145 _, _, binyerr_real, _ = \146 utils.rebin_data_log(self.x, y.real, self.f, y_err=yerr.real,147 dx=self.dx)148 _, biny, binyerr, _ = \149 utils.rebin_data_log(self.x, y, self.f, y_err=yerr,150 dx=self.dx)151 assert np.iscomplexobj(biny)152 assert np.iscomplexobj(binyerr)153 assert np.allclose(biny, real_binned)154 assert np.allclose(binyerr, binyerr_real + 1.j * binyerr_real)155 def test_return_float_with_floats(self):156 _, biny, binyerr, _ = utils.rebin_data_log(157 self.x, self.y, self.f, y_err=self.y_err, dx=self.dx)158 assert not np.iscomplexobj(biny)159 assert not np.iscomplexobj(binyerr)160class TestUtils(object):161 def test_optimal_bin_time(self):162 assert utils.optimal_bin_time(512, 2.1) == 2163 assert utils.optimal_bin_time(512, 3.9) == 2164 assert utils.optimal_bin_time(512, 4.1) == 4165 def test_order_list_of_arrays(self):166 alist = [np.array([1, 0]), np.array([2, 3])]167 order = np.argsort(alist[0])168 assert np.allclose(np.array([np.array([0, 1]), np.array([3, 2])]), np.array(utils.order_list_of_arrays(alist, order)))169 alist = {"a": np.array([1, 0]), "b": np.array([2, 3])}170 alist_new = utils.order_list_of_arrays(alist, order)171 assert np.allclose(np.array([0, 1]), alist_new["a"])172 assert np.allclose(np.array([3, 2]), alist_new["b"])173 alist = 0174 assert utils.order_list_of_arrays(alist, order) is None175 def test_look_for_array(self):176 assert utils.look_for_array_in_array(np.arange(2), np.arange(1, 3))177 assert not utils.look_for_array_in_array(np.arange(2),178 np.arange(2, 4))179 def test_assign_value_if_none(self):180 assert utils.assign_value_if_none(None, 2) == 2181 assert utils.assign_value_if_none(1, 2) == 1182 def test_apply_function_if_none(self):183 assert utils.apply_function_if_none(None, [1, 2, 3], np.median) == 2184 assert utils.apply_function_if_none(1, [1, 2, 3], np.median) == 1185 def test_contiguous(self):186 """A more complicated example of intersection of GTIs."""187 array = np.array([0, 1, 1, 0, 1, 1, 1], dtype=bool)188 cont = utils.contiguous_regions(array)189 assert np.allclose(cont, np.array([[1, 3], [4, 7]])), \190 'Contiguous region wrong'191 def test_get_random_state(self):192 # Life, Universe and Everything193 lue = 42194 random_state = np.random.RandomState(lue)195 assert utils.get_random_state(None) is np.random.mtrand._rand196 assert np.all(197 utils.get_random_state(lue).randn(lue) == np.random.RandomState(198 lue).randn(lue))199 assert np.all(utils.get_random_state(np.random.RandomState(lue)).randn(200 lue) == np.random.RandomState(lue).randn(lue))201 with pytest.raises(ValueError):202 utils.get_random_state('foobar')203class TestCreateWindow(object):204 @classmethod205 def setup_class(cls):206 cls.N = 5207 cls.uniform_window = 'uniform'208 cls.parzen_window = 'parzen'209 cls.hamming_window = 'hamming'210 cls.hanning_window = 'hanning'211 cls.triangular_window = 'triangular'212 cls.welch_window = 'welch'213 cls.blackmann_window = 'blackmann'214 cls.flattop_window = 'flat-top'215 def test_bad_N(self):216 N_bad = 'abc'217 with pytest.raises(TypeError):218 window = utils.create_window(N_bad, self.uniform_window)219 def test_bad_window_type(self):220 window_bad = 123221 with pytest.raises(TypeError):222 window = utils.create_window(self.N, window_bad)223 def test_not_available_window(self):224 window_not = 'kaiser'225 with pytest.raises(ValueError):226 window = utils.create_window(self.N, window_not)227 def test_N_equals_zero(self):228 N = 0229 window = utils.create_window(N)230 assert len(window) == 0231 def test_uniform_window(self):232 result = np.ones(self.N)233 window = utils.create_window(self.N)234 assert np.allclose(window, result)235 def test_parzen_window(self):236 result = np.array([0, 0.25, 1, 0.25, 0])237 window = utils.create_window(self.N, self.parzen_window)238 assert np.allclose(window, result)239 def test_hamming_window(self):240 result = np.array([0.08, 0.54, 1, 0.54, 0.08])241 window = utils.create_window(self.N, self.hamming_window)242 assert np.allclose(window, result)243 def test_hanning_window(self):244 result = np.array([0, 0.5, 1, 0.5, 0])245 window = utils.create_window(self.N, self.hanning_window)246 assert np.allclose(window, result)247 def test_triangular_window(self):248 result = np.array([0.6, 0.8, 1, 0.8, 0.6])249 window = utils.create_window(self.N, self.triangular_window)250 assert np.allclose(window, result)251 def test_welch_window(self):252 result = np.array([0, 0.75, 1, 0.75, 0])253 window = utils.create_window(self.N, self.welch_window)254 assert np.allclose(window, result)255 def test_blackmann_window(self):256 result = np.array([0.006879, 0.349741, 0.999999, 0.349741, 0.006879])257 window = utils.create_window(self.N, self.blackmann_window)258 assert np.allclose(window, result)259 def test_flat_top_window(self):260 result = np.array(261 [8.67361738e-17, -2.62000000e-01, 4.63600000e+00, -2.62000000e-01,262 8.67361738e-17])263 window = utils.create_window(self.N, self.flattop_window)264 assert np.allclose(window, result)265def test_standard_error():266 x = np.arange(1, 100)267 x = np.array(np.split(x, 3))268 error = utils.standard_error(x, x.mean(axis=0))269 assert np.allclose(error, sem(x))270def test_nearest_power_of_two():271 assert utils.nearest_power_of_two(4) == 4272 assert utils.nearest_power_of_two(5) == 4273 assert utils.nearest_power_of_two(6) == 8274 assert utils.nearest_power_of_two(7) == 8275def test_find_nearest():276 x = np.arange(1, 10)277 assert utils.find_nearest(x, 2) == (2, 1)278 assert utils.find_nearest(x, 4.5) == (5, 4)279 assert utils.find_nearest(x, 7.4) == (7, 6)280def test_equal_count_energy_ranges():281 energies = np.random.uniform(0.3, 12, 1000000)282 edges = utils.equal_count_energy_ranges(energies, 100, emin=0.3, emax=12)283 for e0, e1 in zip(edges[:-1], edges[1:]):284 good = energies[(energies >= e0) & (energies < e1)]285 assert np.count_nonzero(good) == 10000286def test_histogram_equiv_numpy():287 x = np.random.uniform(0., 1., 100)288 H, _, = np.histogram(x, bins=5, range=(0., 1.))289 Hn = utils.histogram(x, bins=5, range=np.array([0., 1.]))290 assert np.all(H == Hn)291def test_histogram2d_equiv_numpy():292 x = np.random.uniform(0., 1., 100)293 y = np.random.uniform(2., 3., 100)294 H, _, _ = np.histogram2d(x, y, bins=np.array((5, 5)),295 range=[(0., 1.), (2., 3.)])296 Hn = utils.histogram2d(x, y, bins=np.array([5, 5]),297 range=np.array([[0., 1.], [2., 3.]]))298 assert np.all(H == Hn)299def test_compute_bin():300 bin_edges = np.array([0, 5, 10])301 assert utils.compute_bin(1, bin_edges) == 0302 assert utils.compute_bin(5, bin_edges) == 1...

Full Screen

Full Screen

test_check_package.py

Source:test_check_package.py Github

copy

Full Screen

1"""Test cases for utils/check-package.2It does not inherit from infra.basetest.BRTest and therefore does not generate3a logfile. Only when the tests fail there will be output to the console.4The make target ('make check-package') is already used by the job5'check-package' and won't be tested here.6"""7import os8import subprocess9import unittest10import infra11def call_script(args, env, cwd):12 """Call a script and return stdout and stderr as lists."""13 out, err = subprocess.Popen(args, cwd=cwd, stdout=subprocess.PIPE,14 stderr=subprocess.PIPE, env=env,15 universal_newlines=True).communicate()16 return out.splitlines(), err.splitlines()17class TestCheckPackage(unittest.TestCase):18 """Test the various ways the script can be called.19 The script can be called either using relative path, absolute path or from20 PATH.21 The files to be checked can be passed as arguments using either relative22 path or absolute path.23 When in in-tree mode (without -b) some in-tree files and also all24 out-of-tree files are ignored.25 When in out-tree mode (with -b) the script does generate warnings for these26 but ignores external.mk.27 """28 WITH_EMPTY_PATH = {}29 WITH_UTILS_IN_PATH = {"PATH": infra.basepath("utils") + ":" + os.environ["PATH"]}30 relative = [31 # base_script base_file rel_script rel_file rel_cwd32 ["utils/check-package", "package/atop/atop.mk", "./utils/check-package", "package/atop/atop.mk", ""],33 ["utils/check-package", "package/atop/atop.mk", "./utils/check-package", "./package/atop/atop.mk", ""],34 ["utils/check-package", "package/atop/atop.mk", "../../utils/check-package", "atop.mk", "package/atop"],35 ["utils/check-package", "package/atop/atop.mk", "../../utils/check-package", "./atop.mk", "package/atop"],36 ["utils/check-package", "package/atop/atop.mk", "../utils/check-package", "atop/atop.mk", "package"],37 ["utils/check-package", "package/atop/atop.mk", "../utils/check-package", "./atop/atop.mk", "package"],38 ["utils/check-package", "package/atop/Config.in", "./utils/check-package", "package/atop/Config.in", ""],39 ["utils/check-package", "package/atop/Config.in", "./utils/check-package", "./package/atop/Config.in", ""],40 ["utils/check-package", "package/atop/Config.in", "../../utils/check-package", "Config.in", "package/atop"],41 ["utils/check-package", "package/atop/Config.in", "../../utils/check-package", "./Config.in", "package/atop"],42 ["utils/check-package", "package/atop/Config.in", "../utils/check-package", "atop/Config.in", "package"],43 ["utils/check-package", "package/atop/Config.in", "../utils/check-package", "./atop/Config.in", "package"]]44 def assert_file_was_processed(self, stderr):45 """Infer from check-package stderr if at least one file was processed46 and fail otherwise."""47 self.assertIn("lines processed", stderr[0], stderr)48 processed = int(stderr[0].split()[0])49 self.assertGreater(processed, 0)50 def assert_file_was_ignored(self, stderr):51 """Infer from check-package stderr if no file was processed and fail52 otherwise."""53 self.assertIn("lines processed", stderr[0], stderr)54 processed = int(stderr[0].split()[0])55 self.assertEqual(processed, 0)56 def assert_warnings_generated_for_file(self, stderr):57 """Infer from check-package stderr if at least one warning was generated58 and fail otherwise."""59 self.assertIn("warnings generated", stderr[1], stderr)60 generated = int(stderr[1].split()[0])61 self.assertGreater(generated, 0)62 def test_run(self):63 """Test the various ways the script can be called in a simple top to64 bottom sequence."""65 # an intree file can be checked by the script called from relative path,66 # absolute path and from PATH67 for base_script, base_file, rel_script, rel_file, rel_cwd in self.relative:68 abs_script = infra.basepath(base_script)69 abs_file = infra.basepath(base_file)70 cwd = infra.basepath(rel_cwd)71 _, m = call_script([rel_script, rel_file],72 self.WITH_EMPTY_PATH, cwd)73 self.assert_file_was_processed(m)74 _, m = call_script([abs_script, rel_file],75 self.WITH_EMPTY_PATH, cwd)76 self.assert_file_was_processed(m)77 _, m = call_script(["check-package", rel_file],78 self.WITH_UTILS_IN_PATH, cwd)79 self.assert_file_was_processed(m)80 _, m = call_script([rel_script, abs_file],81 self.WITH_EMPTY_PATH, cwd)82 self.assert_file_was_processed(m)83 _, m = call_script([abs_script, abs_file],84 self.WITH_EMPTY_PATH, cwd)85 self.assert_file_was_processed(m)86 _, m = call_script(["check-package", abs_file],87 self.WITH_UTILS_IN_PATH, cwd)88 self.assert_file_was_processed(m)89 # some intree files are ignored90 _, m = call_script(["./utils/check-package", "package/pkg-generic.mk"],91 self.WITH_EMPTY_PATH, infra.basepath())92 self.assert_file_was_ignored(m)93 _, m = call_script(["./utils/check-package", "-b", "package/pkg-generic.mk"],94 self.WITH_EMPTY_PATH, infra.basepath())95 self.assert_file_was_processed(m)96 # an out-of-tree file can be checked by the script called from relative97 # path, absolute path and from PATH98 for base_script, base_file, rel_script, rel_file, rel_cwd in self.relative:99 abs_script = infra.basepath(base_script)100 abs_file = infra.basepath(base_file)101 cwd = infra.basepath(rel_cwd)102 _, m = call_script([rel_script, "-b", rel_file],103 self.WITH_EMPTY_PATH, cwd)104 self.assert_file_was_processed(m)105 _, m = call_script([abs_script, "-b", rel_file],106 self.WITH_EMPTY_PATH, cwd)107 self.assert_file_was_processed(m)108 _, m = call_script(["check-package", "-b", rel_file],109 self.WITH_UTILS_IN_PATH, cwd)110 self.assert_file_was_processed(m)111 _, m = call_script([rel_script, "-b", abs_file],112 self.WITH_EMPTY_PATH, cwd)113 self.assert_file_was_processed(m)114 _, m = call_script([abs_script, "-b", abs_file],115 self.WITH_EMPTY_PATH, cwd)116 self.assert_file_was_processed(m)117 _, m = call_script(["check-package", "-b", abs_file],118 self.WITH_UTILS_IN_PATH, cwd)119 self.assert_file_was_processed(m)120 # out-of-tree files are are ignored without -b but can generate warnings121 # with -b122 abs_path = infra.filepath("tests/utils/br2-external")123 rel_file = "Config.in"124 abs_file = os.path.join(abs_path, rel_file)125 _, m = call_script(["check-package", rel_file],126 self.WITH_UTILS_IN_PATH, abs_path)127 self.assert_file_was_ignored(m)128 _, m = call_script(["check-package", abs_file],129 self.WITH_UTILS_IN_PATH, infra.basepath())130 self.assert_file_was_ignored(m)131 w, m = call_script(["check-package", "-b", rel_file],132 self.WITH_UTILS_IN_PATH, abs_path)133 self.assert_file_was_processed(m)134 self.assert_warnings_generated_for_file(m)135 self.assertIn("{}:1: empty line at end of file".format(rel_file), w)136 w, m = call_script(["check-package", "-b", abs_file],137 self.WITH_UTILS_IN_PATH, infra.basepath())138 self.assert_file_was_processed(m)139 self.assert_warnings_generated_for_file(m)140 self.assertIn("{}:1: empty line at end of file".format(abs_file), w)141 # external.mk is ignored only when in the root path of a br2-external142 rel_file = "external.mk"143 abs_file = os.path.join(abs_path, rel_file)144 _, m = call_script(["check-package", "-b", rel_file],145 self.WITH_UTILS_IN_PATH, abs_path)146 self.assert_file_was_ignored(m)147 _, m = call_script(["check-package", "-b", abs_file],148 self.WITH_UTILS_IN_PATH, infra.basepath())149 self.assert_file_was_ignored(m)150 abs_path = infra.filepath("tests/utils/br2-external/package/external")151 abs_file = os.path.join(abs_path, rel_file)152 w, m = call_script(["check-package", "-b", rel_file],153 self.WITH_UTILS_IN_PATH, abs_path)154 self.assert_file_was_processed(m)155 self.assert_warnings_generated_for_file(m)156 self.assertIn("{}:1: should be 80 hashes (http://nightly.buildroot.org/#writing-rules-mk)".format(rel_file), w)157 w, m = call_script(["check-package", "-b", abs_file],158 self.WITH_UTILS_IN_PATH, infra.basepath())159 self.assert_file_was_processed(m)160 self.assert_warnings_generated_for_file(m)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run toolium automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful