How to use record_entry method in autotest

Best Python code snippet using autotest_python

core_helpers.py

Source:core_helpers.py Github

copy

Full Screen

1#!/usr/bin/python2# Copyright (c) 2009, Purdue University3# All rights reserved.4# 5# Redistribution and use in source and binary forms, with or without6# modification, are permitted provided that the following conditions are met:7# 8# Redistributions of source code must retain the above copyright notice, this9# list of conditions and the following disclaimer.10#11# Redistributions in binary form must reproduce the above copyright notice, this12# list of conditions and the following disclaimer in the documentation and/or13# other materials provided with the distribution.14# 15# Neither the name of the Purdue University nor the names of its contributors16# may be used to endorse or promote products derived from this software without17# specific prior written permission.18# 19# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"20# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE21# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE22# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE23# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL24# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR25# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER26# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,27# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE28# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.29"""Core helper functions."""30__copyright__ = 'Copyright (C) 2009, Purdue University'31__license__ = 'BSD'32__version__ = '#TRUNK#'33import constants34import errors35import helpers_lib36import datetime37import dns.zone38import IPy39class CoreHelpers(object):40 """Library of helper functions that extend the core functions."""41 def __init__(self, core_instance):42 """Sets up core instance43 Inputs:44 core_instance: instance of RosterCore45 """46 self.core_instance = core_instance47 self.db_instance = core_instance.db_instance48 self.user_instance = core_instance.user_instance49 self.log_instance = core_instance.log_instance50 ### These functions just expose helpers_lib functions for the 51 ### XML-RPC server. For doc strings see helpers_lib52 def ListGroupPermissions(self):53 return self.db_instance.data_validation_instance.ListGroupPermissions()54 def ReverseIP(self, ip_address):55 return helpers_lib.ReverseIP(ip_address)56 def UnReverseIP(self, ip_address):57 return helpers_lib.UnReverseIP(ip_address)58 def CIDRExpand(self, cidr_block, begin=None, end=None):59 return helpers_lib.CIDRExpand(cidr_block, begin, end)60 def ExpandIPV6(self, ip_address):61 return helpers_lib.ExpandIPV6(ip_address)62 def GetViewsByUser(self, username):63 """Lists view names available to given username64 Inputs:65 username: string of user name66 Outputs:67 list: list of view name strings68 """69 function_name, current_args = helpers_lib.GetFunctionNameAndArgs()70 self.user_instance.Authorize(function_name)71 views = set([])72 success = False73 users_dict = self.db_instance.GetEmptyRowDict('users')74 users_dict['user_name'] = username75 user_group_assignments_dict = self.db_instance.GetEmptyRowDict(76 'user_group_assignments')77 groups_dict = self.db_instance.GetEmptyRowDict('groups')78 forward_zone_permissions_dict = self.db_instance.GetEmptyRowDict(79 'forward_zone_permissions')80 zones_dict = self.db_instance.GetEmptyRowDict('zones')81 zone_view_assignments_dict = self.db_instance.GetEmptyRowDict(82 'zone_view_assignments')83 self.db_instance.StartTransaction()84 try:85 joined_list = self.db_instance.ListRow(86 'users', users_dict, 'user_group_assignments',87 user_group_assignments_dict,88 'groups', groups_dict, 'forward_zone_permissions',89 forward_zone_permissions_dict, 'zones', zones_dict,90 'zone_view_assignments', zone_view_assignments_dict)91 finally:92 self.db_instance.EndTransaction()93 for view_dict in joined_list:94 views.add(view_dict['zone_view_assignments_view_dependency'].split(95 '_dep')[0])96 success = True97 return views98 def AddFormattedRecords(self, zone_name, zone_file_string,99 view):100 """Adds records from a string of a partial zone file101 Inputs:102 zone_name: string of zone name103 zone_file_string: string of the file contents104 view: string of view name105 Outputs:106 int: Amount of records added to db.107 """108 all_views = self.core_instance.ListZones(zone_name=zone_name)[109 zone_name]110 origin = all_views[view][u'zone_origin']111 zone = None112 #If the file doesn't have an origin, we need to give it something113 #otherwise dns.zone will raise an UnknownOrigin exception114 if( "$ORIGIN" not in zone_file_string ):115 zone = dns.zone.from_text(str(zone_file_string), check_origin=False,116 origin=origin)117 else:118 zone = dns.zone.from_text(str(zone_file_string), check_origin=False) 119 make_record_args_list = helpers_lib.CreateRecordsFromZoneObject(zone, 120 zone_name=zone_name, view_name=view, zone_origin=origin,121 views_list=all_views)122 self.ProcessRecordsBatch(add_records=make_record_args_list, 123 zone_import=True)124 return len(make_record_args_list)125 def GetCIDRBlocksByView(self, view, username):126 """Lists CIDR blocks available to a username in a given view127 Inputs:128 view: string of view name129 username: string of user name130 Outputs:131 list: list of cidr block strings132 """133 self.user_instance.Authorize('GetCIDRBlocksByView')134 cidrs = set([])135 users_dict = self.db_instance.GetEmptyRowDict('users')136 users_dict['user_name'] = username137 views_dict = self.db_instance.GetEmptyRowDict('views')138 views_dict['view_name'] = view139 user_group_assignments_dict = self.db_instance.GetEmptyRowDict(140 'user_group_assignments')141 groups_dict = self.db_instance.GetEmptyRowDict('groups')142 reverse_range_permissions_dict = self.db_instance.GetEmptyRowDict(143 'reverse_range_permissions')144 zones_dict = self.db_instance.GetEmptyRowDict('zones')145 reverse_range_zone_assignments_dict = self.db_instance.GetEmptyRowDict(146 'reverse_range_zone_assignments')147 zone_view_assignments_dict = self.db_instance.GetEmptyRowDict(148 'zone_view_assignments')149 self.db_instance.StartTransaction()150 try:151 joined_list = self.db_instance.ListRow(152 'views', views_dict,153 'users', users_dict, 'user_group_assignments',154 user_group_assignments_dict,155 'groups', groups_dict, 'reverse_range_permissions',156 reverse_range_permissions_dict, 'zones', zones_dict,157 'zone_view_assignments', zone_view_assignments_dict,158 'reverse_range_zone_assignments', reverse_range_zone_assignments_dict)159 finally:160 self.db_instance.EndTransaction()161 for cidr_dict in joined_list:162 cidrs.add(cidr_dict['reverse_range_zone_assignments_cidr_block'])163 return cidrs164 def GetAssociatedCNAMEs(self, hostname, view_name, zone_name,165 recursive=False):166 """Lists cname's by assignment hostname.167 Inputs:168 hostname: string of hostname169 view_name: string of view name170 zone_name: string of zone name171 Outputs:172 list: list of found cname dictionaries173 """174 self.user_instance.Authorize('GetAssociatedCNAMEs')175 record_arguments_record_assignments_dict = (176 self.db_instance.GetEmptyRowDict(177 'record_arguments_records_assignments'))178 zone_view_assignments_dict = self.db_instance.GetEmptyRowDict(179 'zone_view_assignments')180 zone_view_assignments_dict['zone_view_assignments_zone_name'] = unicode(181 zone_name)182 if( view_name != u'any' ):183 view_dep = u'%s_dep' % view_name184 else: 185 view_dep = u'any'186 zone_view_assignments_dict['zone_view_assignments_view_dependency'] = view_dep187 record_arguments_record_assignments_dict[188 'record_arguments_records_assignments_type'] = u'cname'189 record_arguments_record_assignments_dict[190 'argument_value'] = hostname191 records_dict = self.db_instance.GetEmptyRowDict(192 'records')193 records_dict['record_type'] = u'cname'194 records_dict['record_view_dependency'] = view_dep195 records_dict['record_zone_name'] = zone_name196 self.db_instance.StartTransaction()197 try:198 found_records = self.db_instance.ListRow(199 'records', records_dict,200 'record_arguments_records_assignments',201 record_arguments_record_assignments_dict,202 'zone_view_assignments', zone_view_assignments_dict)203 finally:204 self.db_instance.EndTransaction()205 cnames = []206 for record in found_records:207 new_record = {}208 new_record['record_type'] = record[209 'record_arguments_records_assignments_type']210 new_record['zone_name'] = record['record_zone_name']211 new_record['target'] = record['record_target']212 new_record['ttl'] = record['record_ttl']213 new_record['view_name'] = record[214 'record_view_dependency'].rsplit('_dep')[0]215 new_record['assignment_host'] = record['argument_value']216 new_record['last_user'] = record['record_last_user']217 new_record['zone_origin'] = record['zone_origin']218 cnames.append(new_record)219 if( not recursive ):220 return cnames221 new_cnames = []222 for record in cnames:223 new_cnames.extend(self.GetAssociatedCNAMEs(224 '%s.%s' % (record['target'], record['zone_origin']),225 record['view_name'], record['zone_name'], recursive=recursive))226 cnames.extend(new_cnames)227 del new_cnames228 del found_records229 return cnames230 def ListLatestNamedConfig(self, dns_server_set):231 """Lists the latest named config string given dns server set232 This function is duplicated in233 roster-config-manager/roster_config_manager/tree_exporter.py234 Inputs:235 dns_server_set: string of dns server set name236 Outputs:237 dict: dictionary of latest named config238 """239 named_configs = self.core_instance.ListNamedConfGlobalOptions(240 dns_server_set=dns_server_set)241 current_timestamp = datetime.datetime.now()242 smallest_time_differential = datetime.timedelta(weeks=100000)243 newest_config = None244 for named_config in named_configs:245 time_differential = current_timestamp - named_config['timestamp']246 if( time_differential < smallest_time_differential ):247 smallest_time_differential = time_differential248 newest_config = named_config249 return newest_config250 def RevertNamedConfig(self, dns_server_set, option_id):251 """Revert a Named Config file252 Inputs:253 option_id: the id of config to replicate254 dns_server_set: string of dns server set name255 256 Raises:257 InvalidInputError: DNS server set does not contain id.258 UnexpectedDataError: Multiple configurations found.259 """260 named_config = self.core_instance.ListNamedConfGlobalOptions(261 dns_server_set=dns_server_set, option_id=option_id)262 if( len(named_config) == 0 ):263 raise errors.InvalidInputError(264 'DNS server set "%s" does not contain id "%s"' % (265 dns_server_set, option_id))266 elif( len(named_config) == 1 ):267 self.core_instance.MakeNamedConfGlobalOption(268 dns_server_set, named_config[0]['options'])269 else:270 raise errors.UnexpectedDataError('Multiple configurations found.')271 def MakeAAAARecord(self, target, zone_name, record_args_dict,272 view_name=None, ttl=None):273 """Makes an AAAA record.274 Inputs:275 target: string of target276 zone_name: string of zone name277 record_args_dict: dictionary of record arguments278 view_name: string of view name279 ttl: time to live280 """281 record_args_dict['assignment_ip'] = unicode(IPy.IP(record_args_dict[282 'assignment_ip']).strFullsize())283 self.core_instance.MakeRecord(u'aaaa', target, zone_name, record_args_dict,284 view_name, ttl)285 def MakeSubdomainDelegation(self, zone_name, subdomain_name, nameserver, 286 view_name=u'any'):287 """"Makes a Delegated Subdomain288 Assumes delegation zone is created289 Inputs:290 view_name: string of view name291 zone_name: string of zone name292 subdomain_name: string of subdomain name 293 nameserver: string of fully qualified nameserver294 Raises:295 InvalidInputError: Zone does not exist.296 """297 self.core_instance.MakeRecord(u'ns', subdomain_name, zone_name,298 {u'name_server':nameserver}, view_name)299 def GetPTRTarget(self, long_target, view_name=u'any'):300 """Gets the short PTR target given the long PTR target301 Inputs:302 long_target: String of long PTR target303 view_name: String of view name304 Raises:305 InvalidInputError: No suitable reverse range zone assignments found.306 Outputs:307 string: String of short PTR target308 """309 if( not long_target.endswith('in-addr.arpa.') and not310 long_target.endswith('ip6.arpa.') ):311 long_target = self.ReverseIP(long_target)312 zone_assignment = None313 reverse_range_zone_assignments = (314 self.core_instance.ListReverseRangeZoneAssignments())315 ip_address = IPy.IP(self.UnReverseIP(long_target))316 for zone_assignment in reverse_range_zone_assignments:317 if( zone_assignment in reverse_range_zone_assignments ):318 if( ip_address in IPy.IP(319 reverse_range_zone_assignments[zone_assignment]) ):320 break321 else:322 raise errors.InvalidInputError(323 'No suitable reverse range zone assignments found.')324 zone_detail = self.core_instance.ListZones(view_name=view_name)325 zone_origin = zone_detail[zone_assignment][view_name]['zone_origin']326 # Count number of characters in zone origin, add one to count the extra327 # period and remove that number of characters from the target.328 zone_origin_length = len(zone_origin) + 1329 short_target = long_target[:-zone_origin_length]330 return (short_target, zone_assignment)331 def MakePTRRecord(self, target, record_args_dict,332 view_name=u'any', ttl=None):333 """Makes a ptr record.334 Inputs:335 target: string of target336 record_args_dict: dictionary of record arguments337 view_name: string of view name338 ttl: string of ttl339 """340 target, zone_assignment = self.GetPTRTarget(target, view_name)341 if( record_args_dict['assignment_host'].startswith('@.') ):342 record_args_dict['assignment_host'] = record_args_dict[343 'assignment_host'].lstrip('@.')344 self.core_instance.MakeRecord(u'ptr', target, zone_assignment,345 record_args_dict, view_name, ttl)346 def RemovePTRRecord(self, record_type, target, zone_name, record_args_dict,347 view_name, ttl=None):348 """Removes a ptr record.349 Inputs:350 target: string of target351 record_args_dict: dictionary of record arguments352 view_name: string of view name353 ttl: string of ttl354 """355 if( record_args_dict['assignment_host'].startswith('@.') ):356 record_args_dict['assignment_host'] = record_args_dict[357 'assignment_host'].lstrip('@.')358 self.core_instance.RemoveRecord(record_type, target, zone_name,359 record_args_dict, view_name, ttl)360 def MakeIPv4ClasslessReverseDelegation(self, name_server, cidr_block,361 view_name=u'any', ttl=None):362 """Creates classless forwarding for reverse DNS lookups363 Inputs:364 name_server: nameserver to add for reverse delegation365 cidr_block: string of CIDR block366 view_name: string of view for the reverse zone, defaults to 'any'367 ttl: time-to-live for the newly added records, defaults to zone ttl368 Raises:369 InvalidInputError: nameserver required370 InvalidInputError: cidr block range required371 InvalidInputError: Not a valid zone name or CIDR block372 """373 view_dep = '%s_dep' % view_name374 cidr_octets = cidr_block.split('.')375 num_octets = len(cidr_octets)376 if( num_octets != 4 ):377 raise errors.InvalidInputError('Invalid CIDR octet number')378 cidr_block_target = cidr_octets[-1]379 broadcast_last_octet = cidr_block_target.split('/')[0]380 netmask = cidr_block_target.split('/')[1]381 if( str.isdigit(str(broadcast_last_octet)) and str.isdigit(str(netmask))):382 if( int(netmask) < 25 or int(netmask) > 31 ):383 raise errors.InvalidInputError('Invalid CIDR netmask: %s' % netmask)384 if( int(broadcast_last_octet) < 0 or int(broadcast_last_octet) > 255):385 raise errors.InvalidInputError('Invalid CIDR last octet')386 else:387 raise errors.InvalidInputError('Invalid CIDR last octet/netmask')388 for i in range(1, len(cidr_octets) - 1):389 if( str.isdigit(str(cidr_octets[i])) ):390 if( int(cidr_octets[i]) < 0 or int(cidr_octets[i]) > 255):391 raise errors.InvalidInputError('Invalid CIDR octet %s' %392 cidr_octets[i])393 else:394 raise errors.InvalidInputError('Invalid CIDR octet %s' %395 cidr_octets[i])396 cidr_for_ipy = cidr_octets[0]397 for i in range(1, num_octets - 1):398 cidr_for_ipy = '%s.%s' % (cidr_for_ipy, cidr_octets[i])399 cidr_for_ipy = '%s.%s/%s' % (cidr_for_ipy, '0', netmask)400 expanded_cidr = self.CIDRExpand(cidr_for_ipy,401 begin=long(broadcast_last_octet))402 expanded_cidr.remove(expanded_cidr[-1])403 zone_name = self.ListZoneByIPAddress(cidr_block)404 if( zone_name is None ):405 raise errors.InvalidInputError(406 'ERROR: zone that includes cidr block %s not found' % cidr_block)407 records = self.core_instance.ListRecords(zone_name=zone_name)408 for record in records:409 for ip in expanded_cidr:410 if( self.ReverseIP(ip).split('.')[0] == record['target'] ):411 raise errors.InvalidInputError('ERROR: existing record(s) with '412 'target: %s overlaps given cidr: %s' %413 (record['target'], cidr_block))414 records_batch = []415 cidr_last_target = int(broadcast_last_octet) + pow(2, 32 - int(netmask))416 ns_target = cidr_block_target417 ns_args_dict = self.core_instance.GetEmptyRecordArgsDict(u'ns')418 ns_args_dict['name_server'] = name_server419 ns_record = {'record_type': u'ns', 'record_target': ns_target,420 'record_zone_name': zone_name, 'view_name': view_name,421 'record_view_dependency': view_dep,422 'record_arguments': ns_args_dict}423 records_batch.append(ns_record)424 for ip in expanded_cidr:425 reverse_ip = self.ReverseIP(ip)426 split_reverse_ip = reverse_ip.split('.')427 target = split_reverse_ip[0]428 reverse_ip_for_record = '%s.%s' % (target, cidr_block_target)429 for i in range(1, len(split_reverse_ip)):430 reverse_ip_for_record = '%s.%s' % (reverse_ip_for_record,431 split_reverse_ip[i])432 if( target == broadcast_last_octet or target == unicode(433 cidr_last_target - 1) ):434 continue435 cname_args_dict = self.core_instance.GetEmptyRecordArgsDict(u'cname')436 cname_args_dict['assignment_host'] = unicode(reverse_ip_for_record)437 cname_record = {'record_type': u'cname', 'record_target': unicode(target),438 'record_zone_name': zone_name, 'view_name': view_name,439 'record_view_dependency': view_dep,440 'record_arguments': cname_args_dict}441 records_batch.append(cname_record)442 self.ProcessRecordsBatch(add_records=records_batch)443 def MakeIPv4ClasslessReverseDelegatedTargetZone(self, cidr_block):444 """Creates a delegated reverse zone445 Inputs:446 cidr_block: string of IPv4 cidr block447 Raises:448 InvalidInputError: Not a valid cidr block449 """450 cidr_octets = cidr_block.split('.')451 cidr_block_target = cidr_octets[-1]452 broadcast_last_octet = cidr_block_target.split('/')[0]453 netmask = cidr_block_target.split('/')[1]454 if( str.isdigit(str(broadcast_last_octet)) and str.isdigit(str(netmask))):455 if( int(netmask) < 25 or int(netmask) > 31 or456 int(broadcast_last_octet) < 0 or int(broadcast_last_octet) > 255 ):457 raise errors.InvalidInputError('Invalid CIDR block')458 else:459 raise errors.InvalidInputError('Invalid CIDR block')460 for i in range(1, len(cidr_octets) - 1):461 if( str.isdigit(str(cidr_octets[i])) ):462 if( int(cidr_octets[i]) < 0 or int(cidr_octets[i]) > 255):463 raise errors.InvalidInputError('Invalid CIDR block')464 else:465 raise errors.InvalidInputError('Invalid CIDR block')466 zone_name = u'in-addr.arpa'467 for i in range(0, len(cidr_octets)):468 zone_name = u'%s.%s' % (cidr_octets[i], zone_name)469 zone_type = u'master'470 zone_origin = u'%s.' % zone_name471 self.core_instance.MakeZone(zone_name, zone_type, zone_origin)472 def ListAccessLevels(self):473 """Lists access levels from constants for both integer and string keys474 Outputs:475 dict: dictionary of access levels with both string and integer-string keys476 Example:477 {'32': 32, '64': 64, '128': 128, 'user': 32, 'unlocked_user': 64,478 'dns_admin': 128}479 """480 access_levels_dict = {}481 for key, value in constants.ACCESS_LEVELS.iteritems():482 access_levels_dict[str(value)] = value483 access_levels_dict[key] = value484 return access_levels_dict485 def ListAvailableIpsInCIDR(self, cidr_block, num_ips=1, view_name=None,486 zone_name=None):487 """Finds first available ips. Only lists as many IPs as are available.488 Returns empty list if no IPs are available in given cidr block and a489 truncated list if only a portion of IPs are available.490 Inputs:491 cidr_block: string of ipv4 or ipv6 cidr block492 Raises:493 InvalidInputError: IP is in a reserved IP space.494 InvalidInputError: Not a valid cidr block495 Outputs:496 list: list of strings of ip addresses497 """498 try:499 cidr_block_ipy = IPy.IP(cidr_block)500 except ValueError:501 raise errors.InvalidInputError(502 '%s is not a valid cidr block' % cidr_block)503 reserved_ips = []504 if( cidr_block_ipy.version() == 6 ):505 reserved = constants.RESERVED_IPV6506 elif( cidr_block_ipy.version() == 4 ):507 reserved = constants.RESERVED_IPV4508 for cidr in reserved:509 reserved_ips.append(IPy.IP(cidr))510 for reserved_ip in reserved_ips:511 if( IPy.IP(cidr_block) in reserved_ip ):512 raise errors.InvalidInputError(513 '%s is in a reserved IP space' % cidr_block)514 records = self.ListRecordsByCIDRBlock(cidr_block, view_name=view_name,515 zone_name=zone_name)516 taken_ips = []517 avail_ips = []518 for view in records:519 for ip in records[view]:520 taken_ips.append(ip)521 count = 0L522 while( count < cidr_block_ipy.len() ):523 if( len(avail_ips) >= num_ips ):524 break525 if( cidr_block_ipy[count].strFullsize() not in taken_ips ):526 avail_ips.append(cidr_block_ipy[count].strFullsize())527 count += 1L528 return avail_ips529 def ListRecordsByCIDRBlock(self, cidr_block, view_name=None, zone_name=None):530 """Lists records in a given cidr block.531 Inputs:532 cidr_block: string of ipv4 or ipv6 cidr block533 view_name: string of the view534 zone_name: string of the zone535 536 Raise:537 InvalidInputError: The CIDR block specified does not contain a valid IP538 IPIndexError: Record type not indexable by IP539 IPIndexError: Record type unknown. Missing ipv4 or ipv6 dec index540 Outputs:541 dict: A dictionary Keyed by view, keyed by IP, listed by record.542 example:543 {u'test_view':544 {u'192.168.1.8':545 [{u'forward': True,546 u'host': u'host6.university.edu',547 u'zone': u'forward_zone',548 u'zone_origin': u'university.edu.'},549 {u'forward': False,550 u'host': u'host6.university.edu',551 u'zone': u'reverse_zone',552 u'zone_origin': u'1.168.192.in-addr.arpa.'}]}}553 """554 self.user_instance.Authorize('ListRecordsByCIDRBlock')555 record_list = {}556 try:557 IPy.IP(cidr_block)558 except ValueError:559 raise errors.InvalidInputError(560 'The CIDR block specified does not contain a valid IP: %s' % (561 cidr_block))562 cidr_block = IPy.IP(cidr_block).strFullsize(1)563 if( cidr_block.find('/') != -1 ):564 cidr_ip = IPy.IP(cidr_block.split('/')[0])565 cidr_size = int(cidr_block.split('/')[1])566 else:567 cidr_ip = IPy.IP(cidr_block)568 if( cidr_ip.version() == 4 ):569 cidr_size = 32570 elif( cidr_ip.version() == 6 ):571 cidr_size = 128572 else:573 raise errors.InvalidInputError(574 'The CIDR block specified does not contain a valid IP: %s' % (575 cidr_block))576 records_dict = self.db_instance.GetEmptyRowDict('records')577 zone_view_assignments_dict = self.db_instance.GetEmptyRowDict(578 'zone_view_assignments')579 zone_dict = self.db_instance.GetEmptyRowDict('zones')580 record_arguments_records_assignments_dict = (581 self.db_instance.GetEmptyRowDict(582 'record_arguments_records_assignments'))583 if( view_name is not None and584 view_name.endswith('_dep') or view_name == u'any' ):585 records_dict['record_view_dependency'] = view_name586 elif( view_name is not None ):587 records_dict['record_view_dependency'] = '%s_dep' % view_name588 zone_dict['zone_name'] = zone_name589 if( cidr_ip.version() == 4 ):590 decimal_ip = int( cidr_ip.strDec() )591 decimal_ip_lower = (592 (decimal_ip >> (32 - cidr_size) ) << (32 - cidr_size))593 decimal_ip_upper = ( pow(2, 32 - cidr_size) - 1 ) | decimal_ip594 self.db_instance.StartTransaction()595 ip_index_dict = self.db_instance.GetEmptyRowDict('ipv4_index')596 try:597 record_list = self.db_instance.ListRow(598 'ipv4_index', ip_index_dict,599 'records', records_dict,600 'zones', zone_dict,601 'zone_view_assignments', zone_view_assignments_dict,602 'record_arguments_records_assignments', 603 record_arguments_records_assignments_dict,604 column='ipv4_dec_address',605 range_values=(decimal_ip_lower, decimal_ip_upper))606 finally:607 self.db_instance.EndTransaction()608 elif( cidr_ip.version() == 6 ):609 ip_index_dict = self.db_instance.GetEmptyRowDict('ipv6_index')610 if( cidr_size >= 64 ):611 try:612 ip_index_dict[u'ipv6_dec_upper'] = int(cidr_ip.strHex(0)[:-16], 0)613 except ValueError:614 ip_index_dict[u'ipv6_dec_upper'] = 0615 decimal_ip_lower = int('0x%s' % cidr_ip.strHex(0)[18:], 0)616 decimal_ip_lower_lower = (617 (decimal_ip_lower >> (128 - cidr_size)) <<618 (128 - cidr_size))619 decimal_ip_lower_upper = (620 (pow(2,128 - cidr_size) - 1 ) | decimal_ip_lower)621 column = 'ipv6_dec_lower'622 range_values = (decimal_ip_lower_lower, decimal_ip_lower_upper)623 elif( cidr_size < 64 ):624 try:625 decimal_ip_upper = int(cidr_ip.strHex()[:-16], 0)626 except ValueError:627 decimal_ip_upper = 0628 decimal_ip_upper_lower = (629 (decimal_ip_upper >> (64 - cidr_size)) << (64 - cidr_size))630 decimal_ip_upper_upper = (631 (pow(2,64 - cidr_size) - 1 ) | decimal_ip_upper)632 column = 'ipv6_dec_upper'633 range_values = (decimal_ip_upper_lower, decimal_ip_upper_upper)634 self.db_instance.StartTransaction()635 try:636 record_list = self.db_instance.ListRow(637 'ipv6_index', ip_index_dict,638 'records', records_dict,639 'zones', zone_dict,640 'zone_view_assignments', zone_view_assignments_dict,641 'record_arguments_records_assignments',642 record_arguments_records_assignments_dict,643 column=column,644 range_values=range_values)645 finally:646 self.db_instance.EndTransaction()647 ## Parse returned list648 parsed_record_dict = {}649 for _, record_entry in enumerate(record_list):650 if( record_entry[u'record_type'] not in651 constants.RECORD_TYPES_INDEXED_BY_IP ):652 raise errors.IPIndexError('Record type not indexable by '653 'IP: %s' % record_entry)654 if( record_entry[u'record_view_dependency'].endswith('_dep') ):655 record_view = record_entry[u'record_view_dependency'][:-4]656 else:657 record_view = record_entry[u'record_view_dependency']658 if( record_view not in parsed_record_dict ):659 parsed_record_dict[record_view] = {}660 if( u'ipv4_dec_address' in record_entry ):661 record_ip = u'%s' % (662 IPy.IP(record_entry[u'ipv4_dec_address']).strNormal(1))663 if( record_ip not in parsed_record_dict[record_view] ):664 parsed_record_dict[record_view][record_ip] = []665 elif( u'ipv6_dec_upper' in record_entry ):666 decimal_ip = (667 (record_entry[u'ipv6_dec_upper'] << 64) +668 (record_entry[u'ipv6_dec_lower']) )669 record_ip = u'%s' % IPy.IP(decimal_ip).strFullsize(0)670 if( record_ip not in parsed_record_dict[record_view] ):671 parsed_record_dict[record_view][record_ip] = []672 else:673 raise errors.IPIndexError(674 'Record type unknown. Missing ipv4 or ipv6 dec index: %s' % (675 record_entry))676 record_item = {}677 record_item['records_id'] = record_entry['records_id']678 record_item['record_type'] = record_entry['record_type']679 record_item['record_target'] = record_entry['record_target']680 record_item['record_ttl'] = record_entry['record_ttl']681 record_item['record_zone_name'] = record_entry['record_zone_name']682 record_item[u'zone_origin'] = record_entry[u'zone_origin']683 record_item['record_view_dependency'] = record_entry[684 'record_view_dependency']685 #record_item['record_last_updated'] = record_entry['record_last_updated']686 record_item['record_last_user'] = record_entry['record_last_user']687 if record_entry[u'record_view_dependency'].endswith('_dep'):688 record_item[u'view_name'] = record_entry[u'record_view_dependency'][:-4]689 else:690 record_item[u'view_name'] = record_entry[u'record_view_dependency']691 if( record_entry[u'record_type'] == u'a' or692 record_entry[u'record_type'] == u'aaaa' ):693 record_item[u'forward'] = True694 record_item[u'host'] = '%s.%s' % (695 record_entry[u'record_target'],696 record_entry[u'zone_origin'][:-1])697 record_item[u'zone_origin'] = record_entry['zone_origin']698 record_item[u'record_target'] = record_entry['record_target']699 record_item[u'record_args_dict'] = {700 'assignment_ip': record_entry['argument_value']}701 parsed_record_dict[record_view][record_ip].append( record_item )702 elif( record_entry[u'record_type'] == u'ptr' ):703 record_item[u'zone_origin'] = record_entry['zone_origin']704 record_item[u'record_target'] = record_entry['record_target']705 record_item[u'forward'] = False706 record_item[u'host'] = record_entry[u'argument_value'][:-1]707 assignment_ip = helpers_lib.UnReverseIP(708 '%s.%s' % (709 record_entry['record_target'],record_entry['zone_origin']))710 record_item[u'record_args_dict'] = {'assignment_ip': assignment_ip}711 parsed_record_dict[record_view][record_ip].insert(0, record_item )712 return parsed_record_dict713 def ListRecordsByZone(self, zone_name, view_name=None):714 """Lists records in a given zone.715 Inputs:716 zone_name: name of the zone717 view_name: name of the view718 719 Output:720 dict: A dictionary Keyed by view, keyed by IP, listed by record.721 example:722 {u'test_view':723 {u'192.168.1.8':724 [{u'forward': True,725 u'host': u'host6.university.edu',726 u'zone': u'forward_zone',727 u'zone_origin': u'university.edu.'},728 {u'forward': False,729 u'host': u'host6.university.edu',730 u'zone': u'reverse_zone',731 u'zone_origin': u'1.168.192.in-addr.arpa.'}]}}732 """733 self.user_instance.Authorize('ListRecordsByZone')734 record_list = {}735 736 records_dict = self.db_instance.GetEmptyRowDict('records')737 zone_view_assignments_dict = self.db_instance.GetEmptyRowDict(738 'zone_view_assignments')739 zone_dict = self.db_instance.GetEmptyRowDict('zones')740 zone_dict['zone_name'] = zone_name741 record_arguments_records_assignments_dict = (742 self.db_instance.GetEmptyRowDict(743 'record_arguments_records_assignments'))744 ipv4_index_dict = self.db_instance.GetEmptyRowDict('ipv4_index')745 ipv6_index_dict = self.db_instance.GetEmptyRowDict('ipv6_index')746 if( view_name is not None and747 view_name.endswith('_dep') or view_name == u'any' ):748 records_dict['record_view_dependency'] = view_name749 elif( view_name is not None ):750 records_dict['record_view_dependency'] = '%s_dep' % view_name751 args_ipv4 = ['zone_view_assignments', zone_view_assignments_dict,752 'zones', zone_dict, 'records', records_dict,753 'record_arguments_records_assignments',754 record_arguments_records_assignments_dict]755 args_ipv6 = args_ipv4 + ['ipv6_index', ipv6_index_dict]756 args_ipv4.append('ipv4_index')757 args_ipv4.append(ipv4_index_dict)758 self.db_instance.StartTransaction()759 try:760 record_list = self.db_instance.ListRow(*args_ipv4)761 record_list = record_list + self.db_instance.ListRow(*args_ipv6)762 finally:763 self.db_instance.EndTransaction()764 #Parsing Records765 parsed_record_dict = {}766 for record_entry in record_list:767 if( record_entry[u'record_type'] not in 768 constants.RECORD_TYPES_INDEXED_BY_IP ):769 raise errors.IPIndexError('Record type not indexable by '770 'IP: %s' % record_entry)771 if( record_entry[u'record_view_dependency'].endswith('_dep') ):772 record_view = record_entry[u'record_view_dependency'][:-4]773 else:774 record_view = record_entry[u'record_view_dependency']775 if( record_view not in parsed_record_dict ):776 parsed_record_dict[record_view] = {}777 if( u'ipv4_dec_address' in record_entry ):778 record_ip = u'%s' % (779 IPy.IP(record_entry[u'ipv4_dec_address']).strNormal(1))780 if( record_ip not in parsed_record_dict[record_view] ):781 parsed_record_dict[record_view][record_ip] = []782 elif( u'ipv6_dec_upper' in record_entry ):783 decimal_ip = (784 (record_entry[u'ipv6_dec_upper'] << 64) +785 (record_entry[u'ipv6_dec_lower']) )786 record_ip = u'%s' % IPy.IP(decimal_ip).strFullsize(0)787 if( record_ip not in parsed_record_dict[record_view] ):788 parsed_record_dict[record_view][record_ip] = []789 else:790 raise errors.IPIndexError(791 'Record type unknown. Missing ipv4 or ipv6 dec index: %s' % (792 record_entry))793 record_item = {}794 record_item['records_id'] = record_entry['records_id']795 record_item['record_type'] = record_entry['record_type']796 record_item['record_target'] = record_entry['record_target']797 record_item['record_ttl'] = record_entry['record_ttl']798 record_item['record_zone_name'] = record_entry['record_zone_name']799 record_item[u'zone_origin'] = record_entry[u'zone_origin']800 record_item['record_view_dependency'] = record_entry[801 'record_view_dependency']802 record_item['record_last_user'] = record_entry['record_last_user']803 if record_entry[u'record_view_dependency'].endswith('_dep'):804 record_item[u'view_name'] = record_entry[u'record_view_dependency'][:-4]805 else:806 record_item[u'view_name'] = record_entry[u'record_view_dependency']807 if( record_entry[u'record_type'] == u'a' or808 record_entry[u'record_type'] == u'aaaa' ):809 record_item[u'forward'] = True810 record_item[u'host'] = '%s.%s' % (811 record_entry[u'record_target'],812 record_entry[u'zone_origin'][:-1])813 record_item[u'zone_origin'] = record_entry['zone_origin']814 record_item[u'record_target'] = record_entry['record_target']815 record_item[u'record_args_dict'] = {816 'assignment_ip': record_entry['argument_value']}817 parsed_record_dict[record_view][record_ip].append(record_item)818 elif( record_entry[u'record_type'] == u'ptr' ):819 record_item[u'zone_origin'] = record_entry['zone_origin']820 record_item[u'record_target'] = record_entry['record_target']821 record_item[u'forward'] = False822 record_item[u'host'] = record_entry[u'argument_value'][:-1]823 assignment_ip = helpers_lib.UnReverseIP(824 '%s.%s' % (825 record_entry['record_target'],record_entry['zone_origin']))826 record_item[u'record_args_dict'] = {'assignment_ip': assignment_ip}827 parsed_record_dict[record_view][record_ip].insert(0, record_item )828 return parsed_record_dict829 def SortRecordsByHost(self, records_dict):830 """Generates an IP list sorted by record's host831 Inputs:832 record_dict: dictionary keyed by view, then keyed by IP833 dictionary from ListRecordsByCIDRBlock834 and from ListRecordsByZone835 Outputs:836 sorted_list: list of sorted records837 """838 sorted_list = []839 target_sort = []840 for view in records_dict:841 for ip in records_dict[view]:842 for record in records_dict[view][ip]:843 target_sort.append(dict({'ip_address':IPy.IP(ip).strFullsize()}.items() + 844 record.items()))845 sorted_list = sorted(target_sort, key=lambda x: x['host'])846 return sorted_list847 def ListNamedConfGlobalOptionsClient(self, option_id=None,848 dns_server_set=None, timestamp=None):849 """Converts XMLRPC datetime to datetime object and runs850 ListNamedConfGlobalOptions851 Inputs:852 option_id: integer of the option id853 dns_server_set: string of the dns server set name854 timestamp: XMLRPC datetime timestamp855 Outputs:856 list: list of dictionarires from ListNamedConfGlobalOptions857 """858 success = False859 named_conf_global_options = None860 named_conf_global_options = self.core_instance.ListNamedConfGlobalOptions(861 option_id, dns_server_set, timestamp)862 success = True863 return named_conf_global_options864 def ListZoneByIPAddress(self, ip_address):865 """Lists zone name given ip_address866 Inputs:867 ip_address: string of ip address868 Outputs:869 string: string of zone name, ex: 'test_zone'870 """871 user_ip = IPy.IP(ip_address)872 reverse_range_zone_assignments = (873 self.core_instance.ListReverseRangeZoneAssignments())874 for reverse_range_zone_assignment in reverse_range_zone_assignments:875 db_cidr = IPy.IP(reverse_range_zone_assignments[876 reverse_range_zone_assignment])877 if( user_ip in db_cidr ):878 return reverse_range_zone_assignment879 def RemoveCNamesByAssignmentHost(self, hostname, view_name, zone_name):880 """Removes cname's by assignment hostname, will not remove cnames881 that the user does not have permissin to remove. The function will continue882 and pass over that cname.883 Inputs:884 hostname: string of hostname885 view_name: string of view name886 zone_name: string of zone name887 888 Raises:889 UnexpectedDataError: Incorrect number of records found890 Outputs:891 int: number of rows modified892 """893 function_name, current_args = helpers_lib.GetFunctionNameAndArgs()894 row_count = 0895 record_arguments_record_assignments_dict = (896 self.db_instance.GetEmptyRowDict(897 'record_arguments_records_assignments'))898 record_arguments_record_assignments_dict[899 'record_arguments_records_assignments_type'] = u'cname'900 record_arguments_record_assignments_dict[901 'argument_value'] = hostname902 records_dict = self.db_instance.GetEmptyRowDict(903 'records')904 records_dict['record_type'] = u'cname'905 records_dict['record_view_dependency'] = '%s_dep' % view_name906 records_dict['record_zone_name'] = zone_name907 success = False908 try:909 self.db_instance.StartTransaction()910 try:911 found_record_arguments = self.db_instance.ListRow(912 'record_arguments_records_assignments',913 record_arguments_record_assignments_dict)914 remove_record_dict = {}915 for record_argument in found_record_arguments:916 remove_record_dict[record_argument[917 'record_arguments_records_assignments_record_id']] = {918 'assignment_host': record_argument['argument_value']}919 for record_id in remove_record_dict:920 records_dict['records_id'] = record_id921 found_records_dict = self.db_instance.ListRow(922 'records', records_dict)923 if( len(found_records_dict) != 1 ):924 raise errors.UnexpectedDataError(925 'Incorrect number of records found!')926 try:927 self.core_instance.user_instance.Authorize(928 function_name,929 record_data=930 {'target': found_records_dict[0]['record_target'],931 'zone_name': records_dict['record_zone_name'],932 'view_name': records_dict['record_view_dependency'],933 'record_type': records_dict['record_type']},934 current_transaction=True)935 except errors.AuthorizationError:936 continue937 row_count += self.db_instance.RemoveRow(938 'records', found_records_dict[0])939 remove_record_dict[record_id].update({940 'cname_host': found_records_dict[0]['record_target']})941 except:942 self.db_instance.EndTransaction(rollback=True)943 raise944 self.db_instance.EndTransaction()945 success = True946 log_list = []947 for record_id in remove_record_dict:948 log_list.append('record_id:')949 log_list.append(str(record_id))950 for record in remove_record_dict[record_id]:951 log_list.append('%s:' % record)952 log_list.append(remove_record_dict[record_id][record])953 success = True954 finally:955 self.log_instance.LogAction(self.user_instance.user_name, function_name,956 current_args, success)957 return row_count958 def ConstructRecordArgsDictFromRecordID(self, record_id):959 """Constructs the records_arg_dict from the Roster database given only 960 the record id.961 Inputs:962 record_id: int of record id963 Outputs:964 record_args_dict: dictionary of arguments and their values965 """966 record_args_db_row = self.db_instance.GetEmptyRowDict(967 'record_arguments_records_assignments')968 record_args_db_row[969 'record_arguments_records_assignments_record_id'] = record_id970 record_args_db_dict = self.db_instance.ListRow(971 'record_arguments_records_assignments', record_args_db_row)972 record_args_dict = {}973 for key in record_args_db_dict:974 key_entry = key['argument_value']975 if( key_entry.isdigit() ):976 key_entry = int(key_entry)977 record_args_dict[978 key['record_arguments_records_assignments_argument_name']] = key_entry979 return record_args_dict980 def ProcessRecordsBatch(self, delete_records=None, add_records=None,981 zone_import=False):982 """Proccess batches of records983 Inputs:984 delete_records: list of dictionaries of records985 ex: {'record_ttl': 3600, 'record_type': u'a',986 'records_id': 10, 'record_target': u'host1',987 'record_zone_name': u'forward_zone',988 'record_last_user': u'sharrell',989 'record_view_dependency': u'test_view_dep'}990 {'record_type': 'ptr', 'record_target': 'target',991 'view_name': 'view', 'zone_name': 'zone'}992 add_records: list of dictionaries of records993 994 Raises: 995 RecordsBatchError: Record specification too broad996 RecordsBatchError: No record found997 RecordsBatchError: Record already exists998 RecordsBatchError: CNAME already exists999 RecordsBatchError: Duplicate record found1000 Outputs:1001 int: row count1002 """1003 if delete_records is None:1004 delete_records = []1005 if add_records is None:1006 add_records = []1007 function_name, current_args = helpers_lib.GetFunctionNameAndArgs()1008 log_dict = {'delete': [], 'add': []}1009 row_count = 01010 changed_view_dep = []1011 success = False1012 try:1013 for record in add_records:1014 self.db_instance.ValidateRecordArgsDict(record[u'record_type'], 1015 record[u'record_arguments'])1016 self.db_instance.StartTransaction()1017 try:1018 # REMOVE RECORDS1019 for record in delete_records:1020 record_dict = self.db_instance.GetEmptyRowDict('records')1021 record_dict['records_id'] = record['records_id']1022 record_dict['record_type'] = record['record_type']1023 record_dict['record_target'] = record['record_target']1024 record_dict['record_ttl'] = record['record_ttl']1025 if( record['record_view_dependency'].endswith('_dep') or1026 record['record_view_dependency'] == u'any' ):1027 record_dict['record_view_dependency'] = record[1028 'record_view_dependency']1029 else:1030 record_dict['record_view_dependency'] = (1031 '%s_dep' % record['record_view_dependency'])1032 record_dict['record_zone_name'] = record['record_zone_name']1033 record_dict['record_last_user'] = record['record_last_user']1034 record_args_dict = self.ConstructRecordArgsDictFromRecordID(1035 record['records_id'])1036 self.user_instance.Authorize('ProcessRecordsBatch',1037 record_data = {1038 'target': record['record_target'],1039 'zone_name': record['record_zone_name'],1040 'view_name': record_dict['record_view_dependency'],1041 'record_type': record['record_type'],1042 'record_args_dict': record_args_dict},1043 current_transaction=True)1044 rows_deleted = self.db_instance.RemoveRow('records', record_dict)1045 log_dict['delete'].append(record)1046 row_count += 11047 if( rows_deleted > 1 ):1048 raise errors.RecordsBatchError(1049 'Record specification too broad, '1050 'found %d matching records for %s.' % (1051 rows_deleted, record_dict))1052 elif( rows_deleted == 0 ):1053 raise errors.RecordsBatchError(1054 'No record found for :%s' % record_dict)1055 # ADD RECORDS1056 for record in add_records:1057 #Target length check1058 if( not self.db_instance.data_validation_instance.isTarget(1059 record[u'record_target']) ):1060 raise errors.InvalidInputError('Target hostname is invalid. %s' % (1061 record[u'record_target']))1062 view_name = record['record_view_dependency']1063 if( not record['record_view_dependency'].endswith('_dep') and record[1064 'record_view_dependency'] != u'any'):1065 view_name = '%s_dep' % record['record_view_dependency']1066 self.user_instance.Authorize('ProcessRecordsBatch',1067 record_data = {1068 'target': record['record_target'],1069 'zone_name': record['record_zone_name'],1070 'view_name': view_name,1071 'record_type': record['record_type'],1072 'record_args_dict': record['record_arguments']},1073 current_transaction=True)1074 if( record['record_type'] == u'ptr' ):1075 if( record['record_arguments'][1076 'assignment_host'].startswith('@.') ):1077 record['record_arguments']['assignment_host'] = record[1078 'record_arguments']['assignment_host'].lstrip('@.')1079 changed_view_dep.append((view_name, record['record_zone_name']))1080 ttl = None1081 if( 'ttl' in record ):1082 ttl = record['ttl']1083 if( ttl is None ):1084 ttl = constants.DEFAULT_TTL1085 records_dict = {'records_id': None,1086 'record_target': record['record_target'],1087 'record_type': None,1088 'record_ttl': None,1089 'record_zone_name': record['record_zone_name'],1090 'record_view_dependency': view_name,1091 'record_last_user': None}1092 if( record['record_type'] == 'cname' ):1093 all_records = self.db_instance.ListRow('records', records_dict)1094 if( len(all_records) > 0 ):1095 raise errors.RecordsBatchError(1096 'Record already exists with target %s.' % (1097 record['record_target']))1098 records_dict['record_type'] = u'cname'1099 cname_records = self.db_instance.ListRow('records', records_dict)1100 if( len(cname_records) > 0 ):1101 raise errors.RecordsBatchError('CNAME already exists with target '1102 '%s.' % (record['record_target']))1103 record_args_assignment_dict = self.db_instance.GetEmptyRowDict(1104 'record_arguments_records_assignments')1105 records_dict['record_type'] = record['record_type']1106 raw_records = self.db_instance.ListRow(1107 'records', records_dict, 'record_arguments_records_assignments',1108 record_args_assignment_dict)1109 records_dict['record_last_user'] = self.user_instance.GetUserName()1110 records_dict['record_ttl'] = ttl1111 current_records = (1112 helpers_lib.GetRecordsFromRecordRowsAndArgumentRows(1113 raw_records, record['record_arguments']))1114 for current_record in current_records:1115 for arg in record['record_arguments'].keys():1116 if( arg not in current_record ):1117 break1118 if( record['record_arguments'][arg] is None ):1119 continue1120 if( record['record_arguments'][arg] != current_record[arg] ):1121 break1122 else:1123 raise errors.RecordsBatchError('Duplicate record found: %s' %1124 current_record)1125 records_dict['record_type'] = record['record_type']1126 record_id = self.db_instance.MakeRow('records', records_dict)1127 for arg in record['record_arguments'].keys():1128 record_argument_assignments_dict = {1129 'record_arguments_records_assignments_record_id': record_id,1130 'record_arguments_records_assignments_type': record[1131 'record_type'],1132 'record_arguments_records_assignments_argument_name': arg,1133 'argument_value': unicode(record['record_arguments'][arg])}1134 self.db_instance.MakeRow('record_arguments_records_assignments',1135 record_argument_assignments_dict)1136 log_dict['add'].append(record)1137 row_count += 11138 if( records_dict['record_type'] in1139 constants.RECORD_TYPES_INDEXED_BY_IP ):1140 self.core_instance._AddRecordToIpIndex(1141 records_dict['record_type'], records_dict['record_zone_name'],1142 records_dict['record_view_dependency'],1143 record_id, records_dict['record_target'],1144 record['record_arguments'])1145 changed_view_dep = set(changed_view_dep)1146 for view_dep_pair in changed_view_dep:1147 self.core_instance._IncrementSoa(*view_dep_pair, missing_ok=zone_import)1148 except:1149 self.db_instance.EndTransaction(rollback=True)1150 raise1151 self.db_instance.EndTransaction()1152 success = True1153 finally:1154 self.log_instance.LogAction(self.user_instance.user_name, function_name,1155 current_args, success)1156 return row_count1157 def ListSortedHostsByZone(self, zone_name, view_name=None):1158 records_dict = self.ListRecordsByZone(zone_name, view_name=view_name)1159 sorted_records = self.SortRecordsByHost(records_dict)1160 hosts_dict = {}1161 for record in sorted_records:1162 direction = 'Reverse'1163 if( record['forward'] ):1164 direction = 'Forward'1165 if( not hosts_dict.has_key(record[u'view_name']) ):1166 hosts_dict.update({record[u'view_name']: []})1167 new_record = record.copy()1168 new_record['direction'] = direction1169 hosts_dict[record[u'view_name']].append(new_record)1170 return hosts_dict1171 def ListSortedHostsByCIDR(self, cidr, zone_name=None, view_name=None):1172 records_dict = self.ListRecordsByCIDRBlock(cidr, zone_name=zone_name, 1173 view_name=view_name)1174 ip_address_list = self.CIDRExpand(cidr)1175 1176 if ip_address_list is None:1177 ip_address_list = []1178 if( ip_address_list == [] ):1179 for view in records_dict:1180 ip_address_list.extend(records_dict[view].keys())1181 ip_address_list = list(set(ip_address_list))1182 hosts_dict = {}1183 if( len(records_dict) == 0 ):1184 hosts_dict['--'] = []1185 for ip_address in ip_address_list:1186 hosts_dict['--'].append(1187 {'host': '--', 'direction': '--',1188 'ip_address': ip_address, 'record_zone_name': '--'})1189 else:1190 for view in records_dict:1191 if( not hosts_dict.has_key(view) ):1192 hosts_dict.update({view: []})1193 for ip_address in ip_address_list:1194 if( ip_address in records_dict[view] ):1195 for record in records_dict[view][ip_address]:1196 direction = 'Reverse'1197 if( record['forward'] ):1198 direction = 'Forward'1199 new_record = record.copy()1200 new_record['direction'] = direction1201 new_record['ip_address'] = ip_address1202 hosts_dict[view].append(new_record)1203 else:1204 hosts_dict[view].append(1205 {'ip_address': ip_address, 'direction': '--', 'host': '--',1206 'record_zone_name': '--'})1207 return hosts_dict...

Full Screen

Full Screen

delete.py

Source:delete.py Github

copy

Full Screen

1import tkinter as tk2from tkinter import ttk3from .style import Style4class DeleteRecord:5 def __init__(self, file, menu):6 self.menu = menu7 self.file = file8 self.width = 5009 self.height = 40010 self.top = tk.Toplevel()11 self.top.title("Media Database - Delete Record")12 self.top.geometry("%dx%d" % (self.width, self.height))13 self.top.configure(bg=Style.bg)14 # creates the form15 self.title = tk.Label(self.top,16 text="Delete a record",17 bg=Style.table_top,18 fg="white",19 font=("Courier", Style.font_size),20 borderwidth=2,21 relief="ridge")22 self.title.grid(row=1, column=2, padx=110, pady=20)23 self.record_id_lbl = tk.Label(self.top, text="Please enter the record ID that you want to be deleted: ", bg=Style.bg)24 self.record_id_lbl.grid(row=2, column=2, pady=20)25 self.select_record = tk.Entry(self.top)26 self.select_record.grid(row=3, column=2)27 self.record_load_btn = ttk.Button(self.top, text="Load", style="W.TButton", command=self.load_record)28 self.record_load_btn.grid(row=4, column=2, pady=10)29 self.record_lbl = tk.Label(self.top, text="Are you sure you want to delete this record?", bg=Style.bg)30 self.record_lbl.grid(row=5, column=2, pady=10)31 self.record_entry = tk.Entry(self.top, state="readonly", width=50)32 self.record_entry.grid(row=6, column=2)33 self.btn_del = ttk.Button(self.top, text="Delete Record", command=self.delete_btn, style="W.TButton")34 self.btn_del.grid(row=7, column=2, pady=20)35 self.delete_lbl = tk.Label(self.top, text=" ", bg=Style.bg, font=("calibri"))36 self.delete_lbl.grid(row=8, column=2)37 def load_record(self):38 if self.validate_input():39 record = self.file.find_record(int(self.select_record.get())).fetchone()40 self.record_entry.configure(state="normal")41 self.record_entry.delete(0, tk.END)42 self.record_entry.insert(0, str(record))43 self.record_entry.configure(state="readonly")44 def delete_btn(self):45 if self.delete_record(): # deletes the record46 self.delete_lbl.configure(text="Record has been deleted \nPlease close this window") # feedback for user47 self.menu.refresh_btn()48 self.top.destroy()49 def validate_input(self):50 user_in = self.select_record.get()51 valid = False52 if len(user_in) == 0:53 self.delete_lbl.configure(text="Enter A Record ID")54 else:55 try:56 user_in = int(user_in)57 if not self.file.cursor.execute("SELECT * FROM records WHERE id = ?;", (user_in,)):58 self.delete_lbl.configure(text="Record does not exist")59 else:60 valid = True61 except ValueError:62 self.delete_lbl.configure(text="Invalid Record ID")63 return valid64 def delete_record(self):65 if self.validate_input():66 del_rec = self.select_record.get() # gets the user input for what record67 self.file.cursor.execute("DELETE FROM records WHERE id = ?", (del_rec,))68 self.file.db.commit()...

Full Screen

Full Screen

recorder.py

Source:recorder.py Github

copy

Full Screen

1from django.http import HttpResponse, HttpResponseBadRequest2from amcat.models.record import Record3from amcat.models.article import Article4from amcat.models.coding.codingjob import CodingJob5import dateutil.parser6import json7def record(request):8 record_list = json.loads(request.body)9 for record_entry in record_list:10 record_entry['user'] = request.user11 record_entry['ts'] = dateutil.parser.parse(record_entry['ts'])12 if 'article_id' in record_entry:13 article = Article.objects.get(id=record_entry['article_id'])14 record_entry['article'] = article15 del record_entry['article_id']16 if 'codingjob_id' in record_entry:17 codingjob = CodingJob.objects.get(id=record_entry['codingjob_id'])18 record_entry['codingjob'] = codingjob19 del record_entry['codingjob_id']20 Record.objects.create(**record_entry) ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful