Best Python code snippet using autotest_python
core_helpers.py
Source:core_helpers.py  
1#!/usr/bin/python2# Copyright (c) 2009, Purdue University3# All rights reserved.4# 5# Redistribution and use in source and binary forms, with or without6# modification, are permitted provided that the following conditions are met:7# 8# Redistributions of source code must retain the above copyright notice, this9# list of conditions and the following disclaimer.10#11# Redistributions in binary form must reproduce the above copyright notice, this12# list of conditions and the following disclaimer in the documentation and/or13# other materials provided with the distribution.14# 15# Neither the name of the Purdue University nor the names of its contributors16# may be used to endorse or promote products derived from this software without17# specific prior written permission.18# 19# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"20# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE21# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE22# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE23# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL24# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR25# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER26# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,27# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE28# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.29"""Core helper functions."""30__copyright__ = 'Copyright (C) 2009, Purdue University'31__license__ = 'BSD'32__version__ = '#TRUNK#'33import constants34import errors35import helpers_lib36import datetime37import dns.zone38import IPy39class CoreHelpers(object):40  """Library of helper functions that extend the core functions."""41  def __init__(self, core_instance):42    """Sets up core instance43    Inputs:44       core_instance: instance of RosterCore45    """46    self.core_instance = core_instance47    self.db_instance = core_instance.db_instance48    self.user_instance = core_instance.user_instance49    self.log_instance = core_instance.log_instance50  ### These functions just expose helpers_lib functions for the 51  ### XML-RPC server. For doc strings see helpers_lib52  def ListGroupPermissions(self):53    return self.db_instance.data_validation_instance.ListGroupPermissions()54  def ReverseIP(self, ip_address):55    return helpers_lib.ReverseIP(ip_address)56  def UnReverseIP(self, ip_address):57    return helpers_lib.UnReverseIP(ip_address)58  def CIDRExpand(self, cidr_block, begin=None, end=None):59    return helpers_lib.CIDRExpand(cidr_block, begin, end)60  def ExpandIPV6(self, ip_address):61    return helpers_lib.ExpandIPV6(ip_address)62  def GetViewsByUser(self, username):63    """Lists view names available to given username64    Inputs:65        username: string of user name66    Outputs:67        list: list of view name strings68    """69    function_name, current_args = helpers_lib.GetFunctionNameAndArgs()70    self.user_instance.Authorize(function_name)71    views = set([])72    success = False73    users_dict = self.db_instance.GetEmptyRowDict('users')74    users_dict['user_name'] = username75    user_group_assignments_dict = self.db_instance.GetEmptyRowDict(76        'user_group_assignments')77    groups_dict = self.db_instance.GetEmptyRowDict('groups')78    forward_zone_permissions_dict = self.db_instance.GetEmptyRowDict(79        'forward_zone_permissions')80    zones_dict = self.db_instance.GetEmptyRowDict('zones')81    zone_view_assignments_dict = self.db_instance.GetEmptyRowDict(82        'zone_view_assignments')83    self.db_instance.StartTransaction()84    try:85      joined_list = self.db_instance.ListRow(86          'users', users_dict, 'user_group_assignments',87          user_group_assignments_dict,88          'groups', groups_dict, 'forward_zone_permissions',89          forward_zone_permissions_dict, 'zones', zones_dict,90          'zone_view_assignments', zone_view_assignments_dict)91    finally:92      self.db_instance.EndTransaction()93    for view_dict in joined_list:94      views.add(view_dict['zone_view_assignments_view_dependency'].split(95          '_dep')[0])96    success = True97    return views98  def AddFormattedRecords(self, zone_name, zone_file_string,99                          view):100    """Adds records from a string of a partial zone file101    Inputs:102      zone_name: string of zone name103      zone_file_string: string of the file contents104      view: string of view name105    Outputs:106      int: Amount of records added to db.107    """108    all_views = self.core_instance.ListZones(zone_name=zone_name)[109        zone_name]110    origin = all_views[view][u'zone_origin']111    zone = None112    #If the file doesn't have an origin, we need to give it something113    #otherwise dns.zone will raise an UnknownOrigin exception114    if( "$ORIGIN" not in zone_file_string ):115      zone = dns.zone.from_text(str(zone_file_string), check_origin=False,116        origin=origin)117    else:118      zone = dns.zone.from_text(str(zone_file_string), check_origin=False)   119    make_record_args_list = helpers_lib.CreateRecordsFromZoneObject(zone, 120        zone_name=zone_name, view_name=view, zone_origin=origin,121        views_list=all_views)122    self.ProcessRecordsBatch(add_records=make_record_args_list, 123        zone_import=True)124    return len(make_record_args_list)125  def GetCIDRBlocksByView(self, view, username):126    """Lists CIDR blocks available to a username in a given view127    Inputs:128        view: string of view name129        username: string of user name130    Outputs:131        list: list of cidr block strings132    """133    self.user_instance.Authorize('GetCIDRBlocksByView')134    cidrs = set([])135    users_dict = self.db_instance.GetEmptyRowDict('users')136    users_dict['user_name'] = username137    views_dict = self.db_instance.GetEmptyRowDict('views')138    views_dict['view_name'] = view139    user_group_assignments_dict = self.db_instance.GetEmptyRowDict(140        'user_group_assignments')141    groups_dict = self.db_instance.GetEmptyRowDict('groups')142    reverse_range_permissions_dict = self.db_instance.GetEmptyRowDict(143        'reverse_range_permissions')144    zones_dict = self.db_instance.GetEmptyRowDict('zones')145    reverse_range_zone_assignments_dict = self.db_instance.GetEmptyRowDict(146        'reverse_range_zone_assignments')147    zone_view_assignments_dict = self.db_instance.GetEmptyRowDict(148        'zone_view_assignments')149    self.db_instance.StartTransaction()150    try:151      joined_list = self.db_instance.ListRow(152          'views', views_dict,153          'users', users_dict, 'user_group_assignments',154          user_group_assignments_dict,155          'groups', groups_dict, 'reverse_range_permissions',156          reverse_range_permissions_dict, 'zones', zones_dict,157          'zone_view_assignments', zone_view_assignments_dict,158          'reverse_range_zone_assignments', reverse_range_zone_assignments_dict)159    finally:160      self.db_instance.EndTransaction()161    for cidr_dict in joined_list:162      cidrs.add(cidr_dict['reverse_range_zone_assignments_cidr_block'])163    return cidrs164  def GetAssociatedCNAMEs(self, hostname, view_name, zone_name,165                          recursive=False):166    """Lists cname's by assignment hostname.167    Inputs:168      hostname: string of hostname169      view_name: string of view name170      zone_name: string of zone name171    Outputs:172      list: list of found cname dictionaries173    """174    self.user_instance.Authorize('GetAssociatedCNAMEs')175    record_arguments_record_assignments_dict = (176        self.db_instance.GetEmptyRowDict(177            'record_arguments_records_assignments'))178    zone_view_assignments_dict = self.db_instance.GetEmptyRowDict(179        'zone_view_assignments')180    zone_view_assignments_dict['zone_view_assignments_zone_name'] = unicode(181         zone_name)182    if( view_name != u'any' ):183      view_dep = u'%s_dep' % view_name184    else: 185      view_dep = u'any'186    zone_view_assignments_dict['zone_view_assignments_view_dependency'] = view_dep187    record_arguments_record_assignments_dict[188        'record_arguments_records_assignments_type'] = u'cname'189    record_arguments_record_assignments_dict[190        'argument_value'] = hostname191    records_dict = self.db_instance.GetEmptyRowDict(192        'records')193    records_dict['record_type'] = u'cname'194    records_dict['record_view_dependency'] = view_dep195    records_dict['record_zone_name'] = zone_name196    self.db_instance.StartTransaction()197    try:198      found_records = self.db_instance.ListRow(199          'records', records_dict,200          'record_arguments_records_assignments',201          record_arguments_record_assignments_dict,202          'zone_view_assignments', zone_view_assignments_dict)203    finally:204      self.db_instance.EndTransaction()205    cnames = []206    for record in found_records:207      new_record = {}208      new_record['record_type'] = record[209          'record_arguments_records_assignments_type']210      new_record['zone_name'] = record['record_zone_name']211      new_record['target'] = record['record_target']212      new_record['ttl'] = record['record_ttl']213      new_record['view_name'] = record[214          'record_view_dependency'].rsplit('_dep')[0]215      new_record['assignment_host'] = record['argument_value']216      new_record['last_user'] = record['record_last_user']217      new_record['zone_origin'] = record['zone_origin']218      cnames.append(new_record)219    if( not recursive ):220      return cnames221    new_cnames = []222    for record in cnames:223      new_cnames.extend(self.GetAssociatedCNAMEs(224          '%s.%s' % (record['target'], record['zone_origin']),225          record['view_name'], record['zone_name'], recursive=recursive))226    cnames.extend(new_cnames)227    del new_cnames228    del found_records229    return cnames230  def ListLatestNamedConfig(self, dns_server_set):231    """Lists the latest named config string given dns server set232    This function is duplicated in233    roster-config-manager/roster_config_manager/tree_exporter.py234    Inputs:235      dns_server_set: string of dns server set name236    Outputs:237      dict: dictionary of latest named config238    """239    named_configs = self.core_instance.ListNamedConfGlobalOptions(240        dns_server_set=dns_server_set)241    current_timestamp = datetime.datetime.now()242    smallest_time_differential = datetime.timedelta(weeks=100000)243    newest_config = None244    for named_config in named_configs:245      time_differential = current_timestamp - named_config['timestamp']246      if( time_differential < smallest_time_differential ):247        smallest_time_differential = time_differential248        newest_config = named_config249    return newest_config250  def RevertNamedConfig(self, dns_server_set, option_id):251    """Revert a Named Config file252    Inputs:253      option_id: the id of config to replicate254      dns_server_set: string of dns server set name255    256    Raises:257      InvalidInputError: DNS server set does not contain id.258      UnexpectedDataError: Multiple configurations found.259    """260    named_config = self.core_instance.ListNamedConfGlobalOptions(261        dns_server_set=dns_server_set, option_id=option_id)262    if( len(named_config) == 0 ):263      raise errors.InvalidInputError(264        'DNS server set "%s" does not contain id "%s"' % (265            dns_server_set, option_id))266    elif( len(named_config) == 1 ):267      self.core_instance.MakeNamedConfGlobalOption(268          dns_server_set, named_config[0]['options'])269    else:270      raise errors.UnexpectedDataError('Multiple configurations found.')271  def MakeAAAARecord(self, target, zone_name, record_args_dict,272                     view_name=None, ttl=None):273    """Makes an AAAA record.274    Inputs:275      target: string of target276      zone_name: string of zone name277      record_args_dict: dictionary of record arguments278      view_name: string of view name279      ttl: time to live280    """281    record_args_dict['assignment_ip'] = unicode(IPy.IP(record_args_dict[282        'assignment_ip']).strFullsize())283    self.core_instance.MakeRecord(u'aaaa', target, zone_name, record_args_dict,284                                  view_name, ttl)285  def MakeSubdomainDelegation(self, zone_name, subdomain_name, nameserver, 286                              view_name=u'any'):287    """"Makes a Delegated Subdomain288    Assumes delegation zone is created289    Inputs:290      view_name: string of view name291      zone_name: string of zone name292      subdomain_name: string of subdomain name 293      nameserver: string of fully qualified nameserver294    Raises:295      InvalidInputError: Zone does not exist.296    """297    self.core_instance.MakeRecord(u'ns', subdomain_name, zone_name,298                                  {u'name_server':nameserver}, view_name)299  def GetPTRTarget(self, long_target, view_name=u'any'):300    """Gets the short PTR target given the long PTR target301    Inputs:302      long_target: String of long PTR target303      view_name: String of view name304    Raises:305      InvalidInputError: No suitable reverse range zone assignments found.306    Outputs:307      string: String of short PTR target308    """309    if( not long_target.endswith('in-addr.arpa.') and not310        long_target.endswith('ip6.arpa.') ):311      long_target = self.ReverseIP(long_target)312    zone_assignment = None313    reverse_range_zone_assignments = (314        self.core_instance.ListReverseRangeZoneAssignments())315    ip_address = IPy.IP(self.UnReverseIP(long_target))316    for zone_assignment in reverse_range_zone_assignments:317      if( zone_assignment in reverse_range_zone_assignments ):318        if( ip_address in IPy.IP(319            reverse_range_zone_assignments[zone_assignment]) ):320          break321    else:322      raise errors.InvalidInputError(323          'No suitable reverse range zone assignments found.')324    zone_detail = self.core_instance.ListZones(view_name=view_name)325    zone_origin = zone_detail[zone_assignment][view_name]['zone_origin']326    # Count number of characters in zone origin, add one to count the extra327    # period and remove that number of characters from the target.328    zone_origin_length = len(zone_origin) + 1329    short_target = long_target[:-zone_origin_length]330    return (short_target, zone_assignment)331  def MakePTRRecord(self, target, record_args_dict,332                    view_name=u'any', ttl=None):333    """Makes a ptr record.334    Inputs:335      target: string of target336      record_args_dict: dictionary of record arguments337      view_name: string of view name338      ttl: string of ttl339    """340    target, zone_assignment = self.GetPTRTarget(target, view_name)341    if( record_args_dict['assignment_host'].startswith('@.') ):342      record_args_dict['assignment_host'] = record_args_dict[343          'assignment_host'].lstrip('@.')344    self.core_instance.MakeRecord(u'ptr', target, zone_assignment,345                                  record_args_dict, view_name, ttl)346  def RemovePTRRecord(self, record_type, target, zone_name, record_args_dict,347                      view_name, ttl=None):348    """Removes a ptr record.349    Inputs:350      target: string of target351      record_args_dict: dictionary of record arguments352      view_name: string of view name353      ttl: string of ttl354    """355    if( record_args_dict['assignment_host'].startswith('@.') ):356      record_args_dict['assignment_host'] = record_args_dict[357          'assignment_host'].lstrip('@.')358    self.core_instance.RemoveRecord(record_type, target, zone_name,359                                    record_args_dict, view_name, ttl)360  def MakeIPv4ClasslessReverseDelegation(self, name_server, cidr_block,361                                     view_name=u'any', ttl=None):362    """Creates classless forwarding for reverse DNS lookups363    Inputs:364      name_server: nameserver to add for reverse delegation365      cidr_block: string of CIDR block366      view_name: string of view for the reverse zone, defaults to 'any'367      ttl: time-to-live for the newly added records, defaults to zone ttl368    Raises:369      InvalidInputError: nameserver required370      InvalidInputError: cidr block range required371      InvalidInputError: Not a valid zone name or CIDR block372    """373    view_dep = '%s_dep' % view_name374    cidr_octets = cidr_block.split('.')375    num_octets = len(cidr_octets)376    if( num_octets != 4 ):377      raise errors.InvalidInputError('Invalid CIDR octet number')378    cidr_block_target = cidr_octets[-1]379    broadcast_last_octet = cidr_block_target.split('/')[0]380    netmask = cidr_block_target.split('/')[1]381    if( str.isdigit(str(broadcast_last_octet)) and str.isdigit(str(netmask))):382      if( int(netmask) < 25 or int(netmask) > 31 ):383        raise errors.InvalidInputError('Invalid CIDR netmask: %s' % netmask)384      if( int(broadcast_last_octet) < 0 or int(broadcast_last_octet) > 255):385        raise errors.InvalidInputError('Invalid CIDR last octet')386    else:387      raise errors.InvalidInputError('Invalid CIDR last octet/netmask')388    for i in range(1, len(cidr_octets) - 1):389      if( str.isdigit(str(cidr_octets[i])) ):390        if( int(cidr_octets[i]) < 0 or int(cidr_octets[i]) > 255):391          raise errors.InvalidInputError('Invalid CIDR octet %s' %392                                         cidr_octets[i])393      else:394          raise errors.InvalidInputError('Invalid CIDR octet %s' %395                                         cidr_octets[i])396    cidr_for_ipy = cidr_octets[0]397    for i in range(1, num_octets - 1):398      cidr_for_ipy = '%s.%s' % (cidr_for_ipy, cidr_octets[i])399    cidr_for_ipy = '%s.%s/%s' % (cidr_for_ipy, '0', netmask)400    expanded_cidr = self.CIDRExpand(cidr_for_ipy,401                                    begin=long(broadcast_last_octet))402    expanded_cidr.remove(expanded_cidr[-1])403    zone_name = self.ListZoneByIPAddress(cidr_block)404    if( zone_name is None ):405      raise errors.InvalidInputError(406          'ERROR: zone that includes cidr block %s not found' % cidr_block)407    records = self.core_instance.ListRecords(zone_name=zone_name)408    for record in records:409      for ip in expanded_cidr:410        if( self.ReverseIP(ip).split('.')[0] == record['target'] ):411          raise errors.InvalidInputError('ERROR: existing record(s) with '412                                         'target: %s overlaps given cidr: %s' %413                                         (record['target'], cidr_block))414    records_batch = []415    cidr_last_target = int(broadcast_last_octet) + pow(2, 32 - int(netmask))416    ns_target = cidr_block_target417    ns_args_dict = self.core_instance.GetEmptyRecordArgsDict(u'ns')418    ns_args_dict['name_server'] = name_server419    ns_record = {'record_type': u'ns', 'record_target': ns_target,420                 'record_zone_name': zone_name, 'view_name': view_name,421                 'record_view_dependency': view_dep,422                 'record_arguments': ns_args_dict}423    records_batch.append(ns_record)424    for ip in expanded_cidr:425      reverse_ip = self.ReverseIP(ip)426      split_reverse_ip = reverse_ip.split('.')427      target = split_reverse_ip[0]428      reverse_ip_for_record = '%s.%s' % (target, cidr_block_target)429      for i in range(1, len(split_reverse_ip)):430        reverse_ip_for_record = '%s.%s' % (reverse_ip_for_record,431                                           split_reverse_ip[i])432      if( target == broadcast_last_octet or target == unicode(433             cidr_last_target - 1) ):434        continue435      cname_args_dict = self.core_instance.GetEmptyRecordArgsDict(u'cname')436      cname_args_dict['assignment_host'] = unicode(reverse_ip_for_record)437      cname_record = {'record_type': u'cname', 'record_target': unicode(target),438                      'record_zone_name': zone_name, 'view_name': view_name,439                      'record_view_dependency': view_dep,440                      'record_arguments': cname_args_dict}441      records_batch.append(cname_record)442    self.ProcessRecordsBatch(add_records=records_batch)443  def MakeIPv4ClasslessReverseDelegatedTargetZone(self, cidr_block):444    """Creates a delegated reverse zone445    Inputs:446      cidr_block: string of IPv4 cidr block447    Raises:448      InvalidInputError: Not a valid cidr block449    """450    cidr_octets = cidr_block.split('.')451    cidr_block_target = cidr_octets[-1]452    broadcast_last_octet = cidr_block_target.split('/')[0]453    netmask = cidr_block_target.split('/')[1]454    if( str.isdigit(str(broadcast_last_octet)) and str.isdigit(str(netmask))):455      if( int(netmask) < 25 or int(netmask) > 31 or456          int(broadcast_last_octet) < 0 or int(broadcast_last_octet) > 255 ):457        raise errors.InvalidInputError('Invalid CIDR block')458    else:459      raise errors.InvalidInputError('Invalid CIDR block')460    for i in range(1, len(cidr_octets) - 1):461      if( str.isdigit(str(cidr_octets[i])) ):462        if( int(cidr_octets[i]) < 0 or int(cidr_octets[i]) > 255):463          raise errors.InvalidInputError('Invalid CIDR block')464      else:465          raise errors.InvalidInputError('Invalid CIDR block')466    zone_name = u'in-addr.arpa'467    for i in range(0, len(cidr_octets)):468      zone_name = u'%s.%s' % (cidr_octets[i], zone_name)469    zone_type = u'master'470    zone_origin = u'%s.' % zone_name471    self.core_instance.MakeZone(zone_name, zone_type, zone_origin)472  def ListAccessLevels(self):473    """Lists access levels from constants for both integer and string keys474    Outputs:475      dict: dictionary of access levels with both string and integer-string keys476    Example:477      {'32': 32, '64': 64, '128': 128, 'user': 32, 'unlocked_user': 64,478       'dns_admin': 128}479    """480    access_levels_dict = {}481    for key, value in constants.ACCESS_LEVELS.iteritems():482      access_levels_dict[str(value)] = value483      access_levels_dict[key] = value484    return access_levels_dict485  def ListAvailableIpsInCIDR(self, cidr_block, num_ips=1, view_name=None,486                             zone_name=None):487    """Finds first available ips. Only lists as many IPs as are available.488    Returns empty list if no IPs are available in given cidr block and a489    truncated list if only a portion of IPs are available.490    Inputs:491      cidr_block: string of ipv4 or ipv6 cidr block492    Raises:493      InvalidInputError: IP is in a reserved IP space.494      InvalidInputError: Not a valid cidr block495    Outputs:496      list: list of strings of ip addresses497    """498    try:499      cidr_block_ipy = IPy.IP(cidr_block)500    except ValueError:501      raise errors.InvalidInputError(502          '%s is not a valid cidr block' % cidr_block)503    reserved_ips = []504    if( cidr_block_ipy.version() == 6 ):505      reserved = constants.RESERVED_IPV6506    elif( cidr_block_ipy.version() == 4 ):507      reserved = constants.RESERVED_IPV4508    for cidr in reserved:509      reserved_ips.append(IPy.IP(cidr))510    for reserved_ip in reserved_ips:511      if( IPy.IP(cidr_block) in reserved_ip ):512        raise errors.InvalidInputError(513            '%s is in a reserved IP space' % cidr_block)514    records = self.ListRecordsByCIDRBlock(cidr_block, view_name=view_name,515                                          zone_name=zone_name)516    taken_ips = []517    avail_ips = []518    for view in records:519      for ip in records[view]:520        taken_ips.append(ip)521    count = 0L522    while( count < cidr_block_ipy.len() ):523      if( len(avail_ips) >= num_ips ):524        break525      if( cidr_block_ipy[count].strFullsize() not in taken_ips ):526        avail_ips.append(cidr_block_ipy[count].strFullsize())527      count += 1L528    return avail_ips529  def ListRecordsByCIDRBlock(self, cidr_block, view_name=None, zone_name=None):530    """Lists records in a given cidr block.531    Inputs:532      cidr_block: string of ipv4 or ipv6 cidr block533      view_name: string of the view534      zone_name: string of the zone535    536    Raise:537      InvalidInputError: The CIDR block specified does not contain a valid IP538      IPIndexError: Record type not indexable by IP539      IPIndexError: Record type unknown. Missing ipv4 or ipv6 dec index540    Outputs:541      dict: A dictionary Keyed by view, keyed by IP, listed by record.542            example:543                {u'test_view':544                    {u'192.168.1.8':545                        [{u'forward': True,546                          u'host': u'host6.university.edu',547                          u'zone': u'forward_zone',548                          u'zone_origin': u'university.edu.'},549                         {u'forward': False,550                          u'host': u'host6.university.edu',551                          u'zone': u'reverse_zone',552                          u'zone_origin': u'1.168.192.in-addr.arpa.'}]}}553    """554    self.user_instance.Authorize('ListRecordsByCIDRBlock')555    record_list = {}556    try:557      IPy.IP(cidr_block)558    except ValueError:559      raise errors.InvalidInputError(560          'The CIDR block specified does not contain a valid IP: %s' % (561          cidr_block))562    cidr_block = IPy.IP(cidr_block).strFullsize(1)563    if( cidr_block.find('/') != -1 ):564      cidr_ip = IPy.IP(cidr_block.split('/')[0])565      cidr_size = int(cidr_block.split('/')[1])566    else:567      cidr_ip = IPy.IP(cidr_block)568      if( cidr_ip.version() == 4 ):569        cidr_size = 32570      elif( cidr_ip.version() == 6 ):571        cidr_size = 128572      else:573        raise errors.InvalidInputError(574            'The CIDR block specified does not contain a valid IP: %s' % (575            cidr_block))576    records_dict = self.db_instance.GetEmptyRowDict('records')577    zone_view_assignments_dict = self.db_instance.GetEmptyRowDict(578        'zone_view_assignments')579    zone_dict = self.db_instance.GetEmptyRowDict('zones')580    record_arguments_records_assignments_dict = (581        self.db_instance.GetEmptyRowDict(582            'record_arguments_records_assignments'))583    if( view_name is not None and584        view_name.endswith('_dep') or view_name == u'any' ):585      records_dict['record_view_dependency'] = view_name586    elif( view_name is not None ):587      records_dict['record_view_dependency'] = '%s_dep' % view_name588    zone_dict['zone_name'] = zone_name589    if( cidr_ip.version() == 4 ):590      decimal_ip = int( cidr_ip.strDec() )591      decimal_ip_lower = (592          (decimal_ip >> (32 - cidr_size) ) << (32 - cidr_size))593      decimal_ip_upper = ( pow(2, 32 - cidr_size) - 1 ) | decimal_ip594      self.db_instance.StartTransaction()595      ip_index_dict = self.db_instance.GetEmptyRowDict('ipv4_index')596      try:597        record_list = self.db_instance.ListRow(598            'ipv4_index', ip_index_dict,599            'records', records_dict,600            'zones', zone_dict,601            'zone_view_assignments', zone_view_assignments_dict,602            'record_arguments_records_assignments', 603            record_arguments_records_assignments_dict,604            column='ipv4_dec_address',605            range_values=(decimal_ip_lower, decimal_ip_upper))606      finally:607        self.db_instance.EndTransaction()608    elif( cidr_ip.version() == 6 ):609      ip_index_dict = self.db_instance.GetEmptyRowDict('ipv6_index')610      if( cidr_size >= 64 ):611        try:612          ip_index_dict[u'ipv6_dec_upper'] = int(cidr_ip.strHex(0)[:-16], 0)613        except ValueError:614          ip_index_dict[u'ipv6_dec_upper'] = 0615        decimal_ip_lower = int('0x%s' % cidr_ip.strHex(0)[18:], 0)616        decimal_ip_lower_lower = (617            (decimal_ip_lower >> (128 - cidr_size)) <<618            (128 - cidr_size))619        decimal_ip_lower_upper = (620            (pow(2,128 - cidr_size) - 1 ) | decimal_ip_lower)621        column = 'ipv6_dec_lower'622        range_values = (decimal_ip_lower_lower, decimal_ip_lower_upper)623      elif( cidr_size < 64 ):624        try:625          decimal_ip_upper = int(cidr_ip.strHex()[:-16], 0)626        except ValueError:627          decimal_ip_upper = 0628        decimal_ip_upper_lower = (629            (decimal_ip_upper >> (64 - cidr_size)) << (64 - cidr_size))630        decimal_ip_upper_upper = (631            (pow(2,64 - cidr_size) - 1 ) | decimal_ip_upper)632        column = 'ipv6_dec_upper'633        range_values = (decimal_ip_upper_lower, decimal_ip_upper_upper)634      self.db_instance.StartTransaction()635      try:636        record_list = self.db_instance.ListRow(637            'ipv6_index', ip_index_dict,638            'records', records_dict,639            'zones', zone_dict,640            'zone_view_assignments', zone_view_assignments_dict,641            'record_arguments_records_assignments',642            record_arguments_records_assignments_dict,643            column=column,644            range_values=range_values)645      finally:646        self.db_instance.EndTransaction()647    ## Parse returned list648    parsed_record_dict = {}649    for _, record_entry in enumerate(record_list):650      if( record_entry[u'record_type'] not in651          constants.RECORD_TYPES_INDEXED_BY_IP ):652        raise errors.IPIndexError('Record type not indexable by '653                                  'IP: %s' % record_entry)654      if( record_entry[u'record_view_dependency'].endswith('_dep') ):655        record_view = record_entry[u'record_view_dependency'][:-4]656      else:657        record_view = record_entry[u'record_view_dependency']658      if( record_view not in parsed_record_dict ):659        parsed_record_dict[record_view] = {}660      if( u'ipv4_dec_address' in record_entry ):661        record_ip = u'%s' % (662            IPy.IP(record_entry[u'ipv4_dec_address']).strNormal(1))663        if( record_ip not in parsed_record_dict[record_view] ):664          parsed_record_dict[record_view][record_ip] = []665      elif( u'ipv6_dec_upper' in record_entry ):666        decimal_ip = (667            (record_entry[u'ipv6_dec_upper'] << 64) +668            (record_entry[u'ipv6_dec_lower']) )669        record_ip = u'%s' % IPy.IP(decimal_ip).strFullsize(0)670        if( record_ip not in parsed_record_dict[record_view] ):671          parsed_record_dict[record_view][record_ip] = []672      else:673        raise errors.IPIndexError(674            'Record type unknown. Missing ipv4 or ipv6 dec index: %s' % (675            record_entry))676      record_item = {}677      record_item['records_id'] = record_entry['records_id']678      record_item['record_type'] = record_entry['record_type']679      record_item['record_target'] = record_entry['record_target']680      record_item['record_ttl'] = record_entry['record_ttl']681      record_item['record_zone_name'] = record_entry['record_zone_name']682      record_item[u'zone_origin'] = record_entry[u'zone_origin']683      record_item['record_view_dependency'] = record_entry[684          'record_view_dependency']685      #record_item['record_last_updated'] = record_entry['record_last_updated']686      record_item['record_last_user'] = record_entry['record_last_user']687      if record_entry[u'record_view_dependency'].endswith('_dep'):688        record_item[u'view_name'] = record_entry[u'record_view_dependency'][:-4]689      else:690        record_item[u'view_name'] = record_entry[u'record_view_dependency']691      if( record_entry[u'record_type'] == u'a' or692          record_entry[u'record_type'] == u'aaaa' ):693        record_item[u'forward'] = True694        record_item[u'host'] = '%s.%s' % (695            record_entry[u'record_target'],696            record_entry[u'zone_origin'][:-1])697        record_item[u'zone_origin'] = record_entry['zone_origin']698        record_item[u'record_target'] = record_entry['record_target']699        record_item[u'record_args_dict'] = {700            'assignment_ip': record_entry['argument_value']}701        parsed_record_dict[record_view][record_ip].append( record_item )702      elif( record_entry[u'record_type'] == u'ptr' ):703        record_item[u'zone_origin'] = record_entry['zone_origin']704        record_item[u'record_target'] = record_entry['record_target']705        record_item[u'forward'] = False706        record_item[u'host'] = record_entry[u'argument_value'][:-1]707        assignment_ip = helpers_lib.UnReverseIP(708            '%s.%s' % (709                record_entry['record_target'],record_entry['zone_origin']))710        record_item[u'record_args_dict'] = {'assignment_ip': assignment_ip}711        parsed_record_dict[record_view][record_ip].insert(0, record_item )712    return parsed_record_dict713  def ListRecordsByZone(self, zone_name, view_name=None):714    """Lists records in a given zone.715    Inputs:716      zone_name: name of the zone717      view_name: name of the view718    719    Output:720      dict: A dictionary Keyed by view, keyed by IP, listed by record.721            example:722                {u'test_view':723                    {u'192.168.1.8':724                        [{u'forward': True,725                          u'host': u'host6.university.edu',726                          u'zone': u'forward_zone',727                          u'zone_origin': u'university.edu.'},728                         {u'forward': False,729                          u'host': u'host6.university.edu',730                          u'zone': u'reverse_zone',731                          u'zone_origin': u'1.168.192.in-addr.arpa.'}]}}732    """733    self.user_instance.Authorize('ListRecordsByZone')734    record_list = {}735    736    records_dict = self.db_instance.GetEmptyRowDict('records')737    zone_view_assignments_dict = self.db_instance.GetEmptyRowDict(738        'zone_view_assignments')739    zone_dict = self.db_instance.GetEmptyRowDict('zones')740    zone_dict['zone_name'] = zone_name741    record_arguments_records_assignments_dict = (742        self.db_instance.GetEmptyRowDict(743            'record_arguments_records_assignments'))744    ipv4_index_dict = self.db_instance.GetEmptyRowDict('ipv4_index')745    ipv6_index_dict = self.db_instance.GetEmptyRowDict('ipv6_index')746    if( view_name is not None and747        view_name.endswith('_dep') or view_name == u'any' ):748      records_dict['record_view_dependency'] = view_name749    elif( view_name is not None ):750      records_dict['record_view_dependency'] = '%s_dep' % view_name751    args_ipv4 = ['zone_view_assignments', zone_view_assignments_dict,752                 'zones', zone_dict, 'records', records_dict,753                 'record_arguments_records_assignments',754                 record_arguments_records_assignments_dict]755    args_ipv6 = args_ipv4 + ['ipv6_index', ipv6_index_dict]756    args_ipv4.append('ipv4_index')757    args_ipv4.append(ipv4_index_dict)758    self.db_instance.StartTransaction()759    try:760      record_list = self.db_instance.ListRow(*args_ipv4)761      record_list = record_list + self.db_instance.ListRow(*args_ipv6)762    finally:763      self.db_instance.EndTransaction()764    #Parsing Records765    parsed_record_dict = {}766    for record_entry in record_list:767      if( record_entry[u'record_type'] not in 768          constants.RECORD_TYPES_INDEXED_BY_IP ):769        raise errors.IPIndexError('Record type not indexable by '770                                  'IP: %s' % record_entry)771      if( record_entry[u'record_view_dependency'].endswith('_dep') ):772        record_view = record_entry[u'record_view_dependency'][:-4]773      else:774        record_view = record_entry[u'record_view_dependency']775      if( record_view not in parsed_record_dict ):776          parsed_record_dict[record_view] = {}777      if( u'ipv4_dec_address' in record_entry ):778        record_ip = u'%s' % (779            IPy.IP(record_entry[u'ipv4_dec_address']).strNormal(1))780        if( record_ip not in parsed_record_dict[record_view] ):781          parsed_record_dict[record_view][record_ip] = []782      elif( u'ipv6_dec_upper' in record_entry ):783        decimal_ip = (784            (record_entry[u'ipv6_dec_upper'] << 64) +785            (record_entry[u'ipv6_dec_lower']) )786        record_ip = u'%s' % IPy.IP(decimal_ip).strFullsize(0)787        if( record_ip not in parsed_record_dict[record_view] ):788          parsed_record_dict[record_view][record_ip] = []789      else:790        raise errors.IPIndexError(791            'Record type unknown. Missing ipv4 or ipv6 dec index: %s' % (792            record_entry))793      record_item = {}794      record_item['records_id'] = record_entry['records_id']795      record_item['record_type'] = record_entry['record_type']796      record_item['record_target'] = record_entry['record_target']797      record_item['record_ttl'] = record_entry['record_ttl']798      record_item['record_zone_name'] = record_entry['record_zone_name']799      record_item[u'zone_origin'] = record_entry[u'zone_origin']800      record_item['record_view_dependency'] = record_entry[801                                              'record_view_dependency']802      record_item['record_last_user'] = record_entry['record_last_user']803      if record_entry[u'record_view_dependency'].endswith('_dep'):804        record_item[u'view_name'] = record_entry[u'record_view_dependency'][:-4]805      else:806        record_item[u'view_name'] = record_entry[u'record_view_dependency']807      if( record_entry[u'record_type'] == u'a' or808          record_entry[u'record_type'] == u'aaaa' ):809        record_item[u'forward'] = True810        record_item[u'host'] = '%s.%s' % (811            record_entry[u'record_target'],812            record_entry[u'zone_origin'][:-1])813        record_item[u'zone_origin'] = record_entry['zone_origin']814        record_item[u'record_target'] = record_entry['record_target']815        record_item[u'record_args_dict'] = {816            'assignment_ip': record_entry['argument_value']}817        parsed_record_dict[record_view][record_ip].append(record_item)818      elif( record_entry[u'record_type'] == u'ptr' ):819        record_item[u'zone_origin'] = record_entry['zone_origin']820        record_item[u'record_target'] = record_entry['record_target']821        record_item[u'forward'] = False822        record_item[u'host'] = record_entry[u'argument_value'][:-1]823        assignment_ip = helpers_lib.UnReverseIP(824            '%s.%s' % (825                record_entry['record_target'],record_entry['zone_origin']))826        record_item[u'record_args_dict'] = {'assignment_ip': assignment_ip}827        parsed_record_dict[record_view][record_ip].insert(0, record_item )828    return parsed_record_dict829  def SortRecordsByHost(self, records_dict):830    """Generates an IP list sorted by record's host831    Inputs:832      record_dict: dictionary keyed by view, then keyed by IP833                   dictionary from ListRecordsByCIDRBlock834                   and from ListRecordsByZone835    Outputs:836      sorted_list: list of sorted records837    """838    sorted_list = []839    target_sort = []840    for view in records_dict:841      for ip in records_dict[view]:842        for record in records_dict[view][ip]:843          target_sort.append(dict({'ip_address':IPy.IP(ip).strFullsize()}.items() + 844                                  record.items()))845    sorted_list = sorted(target_sort, key=lambda x: x['host'])846    return sorted_list847  def ListNamedConfGlobalOptionsClient(self, option_id=None,848                                       dns_server_set=None, timestamp=None):849    """Converts XMLRPC datetime to datetime object and runs850    ListNamedConfGlobalOptions851    Inputs:852      option_id: integer of the option id853      dns_server_set: string of the dns server set name854      timestamp: XMLRPC datetime timestamp855    Outputs:856      list: list of dictionarires from ListNamedConfGlobalOptions857    """858    success = False859    named_conf_global_options = None860    named_conf_global_options = self.core_instance.ListNamedConfGlobalOptions(861        option_id, dns_server_set, timestamp)862    success = True863    return named_conf_global_options864  def ListZoneByIPAddress(self, ip_address):865    """Lists zone name given ip_address866    Inputs:867      ip_address: string of ip address868    Outputs:869      string: string of zone name, ex: 'test_zone'870    """871    user_ip = IPy.IP(ip_address)872    reverse_range_zone_assignments = (873        self.core_instance.ListReverseRangeZoneAssignments())874    for reverse_range_zone_assignment in reverse_range_zone_assignments:875      db_cidr = IPy.IP(reverse_range_zone_assignments[876          reverse_range_zone_assignment])877      if( user_ip in db_cidr ):878        return reverse_range_zone_assignment879  def RemoveCNamesByAssignmentHost(self, hostname, view_name, zone_name):880    """Removes cname's by assignment hostname, will not remove cnames881    that the user does not have permissin to remove. The function will continue882    and pass over that cname.883    Inputs:884      hostname: string of hostname885      view_name: string of view name886      zone_name: string of zone name887    888    Raises:889      UnexpectedDataError: Incorrect number of records found890    Outputs:891      int: number of rows modified892    """893    function_name, current_args = helpers_lib.GetFunctionNameAndArgs()894    row_count = 0895    record_arguments_record_assignments_dict = (896        self.db_instance.GetEmptyRowDict(897            'record_arguments_records_assignments'))898    record_arguments_record_assignments_dict[899        'record_arguments_records_assignments_type'] = u'cname'900    record_arguments_record_assignments_dict[901        'argument_value'] = hostname902    records_dict = self.db_instance.GetEmptyRowDict(903        'records')904    records_dict['record_type'] = u'cname'905    records_dict['record_view_dependency'] = '%s_dep' % view_name906    records_dict['record_zone_name'] = zone_name907    success = False908    try:909      self.db_instance.StartTransaction()910      try:911        found_record_arguments = self.db_instance.ListRow(912            'record_arguments_records_assignments',913            record_arguments_record_assignments_dict)914        remove_record_dict = {}915        for record_argument in found_record_arguments:916          remove_record_dict[record_argument[917              'record_arguments_records_assignments_record_id']] = {918                  'assignment_host': record_argument['argument_value']}919        for record_id in remove_record_dict:920          records_dict['records_id'] = record_id921          found_records_dict = self.db_instance.ListRow(922              'records', records_dict)923          if( len(found_records_dict) != 1 ):924            raise errors.UnexpectedDataError(925                'Incorrect number of records found!')926          try:927            self.core_instance.user_instance.Authorize(928                function_name,929                 record_data=930                     {'target': found_records_dict[0]['record_target'],931                      'zone_name': records_dict['record_zone_name'],932                      'view_name': records_dict['record_view_dependency'],933                      'record_type': records_dict['record_type']},934                current_transaction=True)935          except errors.AuthorizationError:936            continue937          row_count += self.db_instance.RemoveRow(938              'records', found_records_dict[0])939          remove_record_dict[record_id].update({940              'cname_host': found_records_dict[0]['record_target']})941      except:942        self.db_instance.EndTransaction(rollback=True)943        raise944      self.db_instance.EndTransaction()945      success = True946      log_list = []947      for record_id in remove_record_dict:948        log_list.append('record_id:')949        log_list.append(str(record_id))950        for record in remove_record_dict[record_id]:951          log_list.append('%s:' % record)952          log_list.append(remove_record_dict[record_id][record])953      success = True954    finally:955      self.log_instance.LogAction(self.user_instance.user_name, function_name,956                                  current_args, success)957    return row_count958  def ConstructRecordArgsDictFromRecordID(self, record_id):959    """Constructs the records_arg_dict from the Roster database given only 960    the record id.961    Inputs:962    record_id: int of record id963    Outputs:964    record_args_dict: dictionary of arguments and their values965    """966    record_args_db_row = self.db_instance.GetEmptyRowDict(967      'record_arguments_records_assignments')968    record_args_db_row[969        'record_arguments_records_assignments_record_id'] = record_id970    record_args_db_dict = self.db_instance.ListRow(971      'record_arguments_records_assignments', record_args_db_row)972    record_args_dict = {}973    for key in record_args_db_dict:974      key_entry = key['argument_value']975      if( key_entry.isdigit() ):976        key_entry = int(key_entry)977      record_args_dict[978        key['record_arguments_records_assignments_argument_name']] = key_entry979    return record_args_dict980  def ProcessRecordsBatch(self, delete_records=None, add_records=None,981                          zone_import=False):982    """Proccess batches of records983    Inputs:984      delete_records: list of dictionaries of records985                      ex: {'record_ttl': 3600, 'record_type': u'a',986                          'records_id': 10, 'record_target': u'host1',987                          'record_zone_name': u'forward_zone',988                          'record_last_user': u'sharrell',989                          'record_view_dependency': u'test_view_dep'}990                          {'record_type': 'ptr', 'record_target': 'target',991                          'view_name': 'view', 'zone_name': 'zone'}992      add_records: list of dictionaries of records993    994    Raises: 995      RecordsBatchError: Record specification too broad996      RecordsBatchError: No record found997      RecordsBatchError: Record already exists998      RecordsBatchError: CNAME already exists999      RecordsBatchError: Duplicate record found1000    Outputs:1001      int: row count1002    """1003    if delete_records is None:1004      delete_records = []1005    if add_records is None:1006      add_records = []1007    function_name, current_args = helpers_lib.GetFunctionNameAndArgs()1008    log_dict = {'delete': [], 'add': []}1009    row_count = 01010    changed_view_dep = []1011    success = False1012    try:1013      for record in add_records:1014        self.db_instance.ValidateRecordArgsDict(record[u'record_type'], 1015                                                record[u'record_arguments'])1016      self.db_instance.StartTransaction()1017      try:1018        # REMOVE RECORDS1019        for record in delete_records:1020          record_dict = self.db_instance.GetEmptyRowDict('records')1021          record_dict['records_id'] = record['records_id']1022          record_dict['record_type'] = record['record_type']1023          record_dict['record_target'] = record['record_target']1024          record_dict['record_ttl'] = record['record_ttl']1025          if( record['record_view_dependency'].endswith('_dep') or1026              record['record_view_dependency'] == u'any' ):1027            record_dict['record_view_dependency'] = record[1028                'record_view_dependency']1029          else:1030            record_dict['record_view_dependency'] = (1031                '%s_dep' % record['record_view_dependency'])1032          record_dict['record_zone_name'] = record['record_zone_name']1033          record_dict['record_last_user'] = record['record_last_user']1034          record_args_dict = self.ConstructRecordArgsDictFromRecordID(1035              record['records_id'])1036          self.user_instance.Authorize('ProcessRecordsBatch',1037              record_data = {1038                  'target': record['record_target'],1039                  'zone_name': record['record_zone_name'],1040                  'view_name': record_dict['record_view_dependency'],1041                  'record_type': record['record_type'],1042                  'record_args_dict': record_args_dict},1043              current_transaction=True)1044          rows_deleted = self.db_instance.RemoveRow('records', record_dict)1045          log_dict['delete'].append(record)1046          row_count += 11047          if( rows_deleted > 1 ):1048            raise errors.RecordsBatchError(1049                  'Record specification too broad, '1050                  'found %d matching records for %s.' % (1051                      rows_deleted, record_dict))1052          elif( rows_deleted == 0 ):1053            raise errors.RecordsBatchError(1054                  'No record found for :%s' % record_dict)1055        # ADD RECORDS1056        for record in add_records:1057          #Target length check1058          if( not self.db_instance.data_validation_instance.isTarget(1059              record[u'record_target']) ):1060            raise errors.InvalidInputError('Target hostname is invalid. %s' % (1061                record[u'record_target']))1062          view_name = record['record_view_dependency']1063          if( not record['record_view_dependency'].endswith('_dep') and record[1064                'record_view_dependency'] != u'any'):1065            view_name = '%s_dep' % record['record_view_dependency']1066          self.user_instance.Authorize('ProcessRecordsBatch',1067              record_data = {1068                  'target': record['record_target'],1069                  'zone_name': record['record_zone_name'],1070                  'view_name': view_name,1071                  'record_type': record['record_type'],1072                  'record_args_dict': record['record_arguments']},1073              current_transaction=True)1074          if( record['record_type'] == u'ptr' ):1075            if( record['record_arguments'][1076                'assignment_host'].startswith('@.') ):1077              record['record_arguments']['assignment_host'] = record[1078                  'record_arguments']['assignment_host'].lstrip('@.')1079          changed_view_dep.append((view_name, record['record_zone_name']))1080          ttl = None1081          if( 'ttl' in record ):1082            ttl = record['ttl']1083          if( ttl is None ):1084            ttl = constants.DEFAULT_TTL1085          records_dict = {'records_id': None,1086                          'record_target': record['record_target'],1087                          'record_type': None,1088                          'record_ttl': None,1089                          'record_zone_name': record['record_zone_name'],1090                          'record_view_dependency': view_name,1091                          'record_last_user': None}1092          if( record['record_type'] == 'cname' ):1093            all_records = self.db_instance.ListRow('records', records_dict)1094            if( len(all_records) > 0 ):1095              raise errors.RecordsBatchError(1096                  'Record already exists with target %s.' % (1097                  record['record_target']))1098          records_dict['record_type'] = u'cname'1099          cname_records = self.db_instance.ListRow('records', records_dict)1100          if( len(cname_records) > 0 ):1101            raise errors.RecordsBatchError('CNAME already exists with target '1102                                           '%s.' % (record['record_target']))1103          record_args_assignment_dict = self.db_instance.GetEmptyRowDict(1104              'record_arguments_records_assignments')1105          records_dict['record_type'] = record['record_type']1106          raw_records = self.db_instance.ListRow(1107              'records', records_dict, 'record_arguments_records_assignments',1108              record_args_assignment_dict)1109          records_dict['record_last_user'] = self.user_instance.GetUserName()1110          records_dict['record_ttl'] = ttl1111          current_records = (1112              helpers_lib.GetRecordsFromRecordRowsAndArgumentRows(1113              raw_records, record['record_arguments']))1114          for current_record in current_records:1115            for arg in record['record_arguments'].keys():1116              if( arg not in current_record ):1117                break1118              if( record['record_arguments'][arg] is None ):1119                continue1120              if( record['record_arguments'][arg] != current_record[arg] ):1121                break1122            else:1123              raise errors.RecordsBatchError('Duplicate record found: %s' %1124                                             current_record)1125          records_dict['record_type'] = record['record_type']1126          record_id = self.db_instance.MakeRow('records', records_dict)1127          for arg in record['record_arguments'].keys():1128            record_argument_assignments_dict = {1129               'record_arguments_records_assignments_record_id': record_id,1130               'record_arguments_records_assignments_type': record[1131                   'record_type'],1132               'record_arguments_records_assignments_argument_name': arg,1133               'argument_value': unicode(record['record_arguments'][arg])}1134            self.db_instance.MakeRow('record_arguments_records_assignments',1135                                     record_argument_assignments_dict)1136            log_dict['add'].append(record)1137            row_count += 11138          if( records_dict['record_type'] in1139              constants.RECORD_TYPES_INDEXED_BY_IP ):1140            self.core_instance._AddRecordToIpIndex(1141                records_dict['record_type'], records_dict['record_zone_name'],1142                records_dict['record_view_dependency'],1143                record_id, records_dict['record_target'],1144                record['record_arguments'])1145        changed_view_dep = set(changed_view_dep)1146        for view_dep_pair in changed_view_dep:1147          self.core_instance._IncrementSoa(*view_dep_pair, missing_ok=zone_import)1148      except:1149        self.db_instance.EndTransaction(rollback=True)1150        raise1151      self.db_instance.EndTransaction()1152      success = True1153    finally:1154      self.log_instance.LogAction(self.user_instance.user_name, function_name,1155                                  current_args, success)1156    return row_count1157  def ListSortedHostsByZone(self, zone_name, view_name=None):1158    records_dict = self.ListRecordsByZone(zone_name, view_name=view_name)1159    sorted_records = self.SortRecordsByHost(records_dict)1160    hosts_dict = {}1161    for record in sorted_records:1162      direction = 'Reverse'1163      if( record['forward'] ):1164        direction = 'Forward'1165      if( not hosts_dict.has_key(record[u'view_name']) ):1166        hosts_dict.update({record[u'view_name']: []})1167      new_record = record.copy()1168      new_record['direction'] = direction1169      hosts_dict[record[u'view_name']].append(new_record)1170    return hosts_dict1171  def ListSortedHostsByCIDR(self, cidr, zone_name=None, view_name=None):1172    records_dict = self.ListRecordsByCIDRBlock(cidr, zone_name=zone_name, 1173      view_name=view_name)1174    ip_address_list = self.CIDRExpand(cidr)1175    1176    if ip_address_list is None:1177      ip_address_list = []1178    if( ip_address_list == [] ):1179      for view in records_dict:1180        ip_address_list.extend(records_dict[view].keys())1181      ip_address_list = list(set(ip_address_list))1182    hosts_dict = {}1183    if( len(records_dict) == 0 ):1184      hosts_dict['--'] = []1185      for ip_address in ip_address_list:1186        hosts_dict['--'].append(1187            {'host': '--', 'direction': '--',1188             'ip_address': ip_address, 'record_zone_name': '--'})1189    else:1190      for view in records_dict:1191        if( not hosts_dict.has_key(view) ):1192          hosts_dict.update({view: []})1193        for ip_address in ip_address_list:1194          if( ip_address in records_dict[view] ):1195            for record in records_dict[view][ip_address]:1196              direction = 'Reverse'1197              if( record['forward'] ):1198                direction = 'Forward'1199              new_record = record.copy()1200              new_record['direction'] = direction1201              new_record['ip_address'] = ip_address1202              hosts_dict[view].append(new_record)1203          else:1204            hosts_dict[view].append(1205                {'ip_address': ip_address, 'direction': '--', 'host': '--',1206                 'record_zone_name': '--'})1207    return hosts_dict...delete.py
Source:delete.py  
1import tkinter as tk2from tkinter import ttk3from .style import Style4class DeleteRecord:5    def __init__(self, file, menu):6        self.menu = menu7        self.file = file8        self.width = 5009        self.height = 40010        self.top = tk.Toplevel()11        self.top.title("Media Database - Delete Record")12        self.top.geometry("%dx%d" % (self.width, self.height))13        self.top.configure(bg=Style.bg)14        # creates the form15        self.title = tk.Label(self.top,16                              text="Delete a record",17                              bg=Style.table_top,18                              fg="white",19                              font=("Courier", Style.font_size),20                              borderwidth=2,21                              relief="ridge")22        self.title.grid(row=1, column=2, padx=110, pady=20)23        self.record_id_lbl = tk.Label(self.top, text="Please enter the record ID that you want to be deleted: ", bg=Style.bg)24        self.record_id_lbl.grid(row=2, column=2, pady=20)25        self.select_record = tk.Entry(self.top)26        self.select_record.grid(row=3, column=2)27        self.record_load_btn = ttk.Button(self.top, text="Load", style="W.TButton", command=self.load_record)28        self.record_load_btn.grid(row=4, column=2, pady=10)29        self.record_lbl = tk.Label(self.top, text="Are you sure you want to delete this record?", bg=Style.bg)30        self.record_lbl.grid(row=5, column=2, pady=10)31        self.record_entry = tk.Entry(self.top, state="readonly", width=50)32        self.record_entry.grid(row=6, column=2)33        self.btn_del = ttk.Button(self.top, text="Delete Record", command=self.delete_btn, style="W.TButton")34        self.btn_del.grid(row=7, column=2, pady=20)35        self.delete_lbl = tk.Label(self.top, text=" ", bg=Style.bg, font=("calibri"))36        self.delete_lbl.grid(row=8, column=2)37    def load_record(self):38        if self.validate_input():39            record = self.file.find_record(int(self.select_record.get())).fetchone()40            self.record_entry.configure(state="normal")41            self.record_entry.delete(0, tk.END)42            self.record_entry.insert(0, str(record))43            self.record_entry.configure(state="readonly")44    def delete_btn(self):45        if self.delete_record():  # deletes the record46            self.delete_lbl.configure(text="Record has been deleted \nPlease close this window")  # feedback for user47            self.menu.refresh_btn()48            self.top.destroy()49    def validate_input(self):50        user_in = self.select_record.get()51        valid = False52        if len(user_in) == 0:53            self.delete_lbl.configure(text="Enter A Record ID")54        else:55            try:56                user_in = int(user_in)57                if not self.file.cursor.execute("SELECT * FROM records WHERE id = ?;", (user_in,)):58                    self.delete_lbl.configure(text="Record does not exist")59                else:60                    valid = True61            except ValueError:62                self.delete_lbl.configure(text="Invalid Record ID")63        return valid64    def delete_record(self):65        if self.validate_input():66            del_rec = self.select_record.get()  # gets the user input for what record67            self.file.cursor.execute("DELETE FROM records WHERE id = ?", (del_rec,))68            self.file.db.commit()...recorder.py
Source:recorder.py  
1from django.http import HttpResponse, HttpResponseBadRequest2from amcat.models.record import Record3from amcat.models.article import Article4from amcat.models.coding.codingjob import CodingJob5import dateutil.parser6import json7def record(request):8    record_list = json.loads(request.body)9    for record_entry in record_list:10        record_entry['user'] = request.user11        record_entry['ts'] = dateutil.parser.parse(record_entry['ts'])12        if 'article_id' in record_entry:13            article = Article.objects.get(id=record_entry['article_id'])14            record_entry['article'] = article15            del record_entry['article_id']16        if 'codingjob_id' in record_entry:17            codingjob = CodingJob.objects.get(id=record_entry['codingjob_id'])18            record_entry['codingjob'] = codingjob19            del record_entry['codingjob_id']20        Record.objects.create(**record_entry)            ...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
