Best Python code snippet using localstack_python
kerberos_common.py
Source:kerberos_common.py  
1"""2Licensed to the Apache Software Foundation (ASF) under one3or more contributor license agreements.  See the NOTICE file4distributed with this work for additional information5regarding copyright ownership.  The ASF licenses this file6to you under the Apache License, Version 2.0 (the7"License"); you may not use this file except in compliance8with the License.  You may obtain a copy of the License at9    http://www.apache.org/licenses/LICENSE-2.010Unless required by applicable law or agreed to in writing, software11distributed under the License is distributed on an "AS IS" BASIS,12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13See the License for the specific language governing permissions and14limitations under the License.15"""16import base6417import getpass18import os19import string20import subprocess21import sys22import tempfile23from tempfile import gettempdir24from resource_management import *25from utils import get_property_value26from ambari_commons.os_utils import remove_file27from ambari_agent import Constants28class KerberosScript(Script):29  KRB5_REALM_PROPERTIES = [30    'kdc',31    'admin_server',32    'default_domain',33    'master_kdc'34  ]35  KRB5_SECTION_NAMES = [36    'libdefaults',37    'logging',38    'realms',39    'domain_realm',40    'capaths',41    'ca_paths',42    'appdefaults',43    'plugins'44  ]45  @staticmethod46  def create_random_password():47    import random48    chars = string.digits + string.ascii_letters49    return ''.join(random.choice(chars) for x in range(13))50  @staticmethod51  def write_conf_section(output_file, section_name, section_data):52    if section_name is not None:53      output_file.write('[%s]\n' % section_name)54      if section_data is not None:55        for key, value in section_data.iteritems():56          output_file.write(" %s = %s\n" % (key, value))57  @staticmethod58  def _write_conf_realm(output_file, realm_name, realm_data):59    """ Writes out realm details60    Example:61     EXAMPLE.COM = {62      kdc = kerberos.example.com63      admin_server = kerberos.example.com64     }65    """66    if realm_name is not None:67      output_file.write(" %s = {\n" % realm_name)68      if realm_data is not None:69        for key, value in realm_data.iteritems():70          if key in KerberosScript.KRB5_REALM_PROPERTIES:71            output_file.write("  %s = %s\n" % (key, value))72      output_file.write(" }\n")73  @staticmethod74  def write_conf_realms_section(output_file, section_name, realms_data):75    if section_name is not None:76      output_file.write('[%s]\n' % section_name)77      if realms_data is not None:78        for realm, realm_data in realms_data.iteritems():79          KerberosScript._write_conf_realm(output_file, realm, realm_data)80          output_file.write('\n')81  @staticmethod82  def write_krb5_conf():83    import params84    Directory(params.krb5_conf_dir,85              owner='root',86              create_parents = True,87              group='root',88              mode=075589    )90    content = InlineTemplate(params.krb5_conf_template)91    File(params.krb5_conf_path,92         content=content,93         owner='root',94         group='root',95         mode=064496    )97  @staticmethod98  def invoke_kadmin(query, admin_identity=None, default_realm=None):99    """100    Executes the kadmin or kadmin.local command (depending on whether auth_identity is set or not101    and returns command result code and standard out data.102    :param query: the kadmin query to execute103    :param admin_identity: the identity for the administrative user (optional)104    :param default_realm: the default realm to assume105    :return: return_code, out106    """107    if (query is not None) and (len(query) > 0):108      auth_principal = None109      auth_keytab_file = None110      if admin_identity is not None:111        auth_principal = get_property_value(admin_identity, 'principal')112      if auth_principal is None:113        kadmin = 'kadmin.local'114        credential = ''115      else:116        kadmin = 'kadmin -p "%s"' % auth_principal117        auth_password = get_property_value(admin_identity, 'password')118        if auth_password is None:119          auth_keytab = get_property_value(admin_identity, 'keytab')120          if auth_keytab is not None:121            (fd, auth_keytab_file) = tempfile.mkstemp()122            keytab_file_path = keytab_file_path.replace("_HOST", params.hostname)123            os.write(fd, base64.b64decode(auth_keytab))124            os.close(fd)125          credential = '-k -t %s' % auth_keytab_file126        else:127          credential = '-w "%s"' % auth_password128      if (default_realm is not None) and (len(default_realm) > 0):129        realm = '-r %s' % default_realm130      else:131        realm = ''132      try:133        command = '%s %s %s -q "%s"' % (kadmin, credential, realm, query.replace('"', '\\"'))134        return shell.checked_call(command)135      except:136        raise137      finally:138        if auth_keytab_file is not None:139          os.remove(auth_keytab_file)140  @staticmethod141  def create_keytab_file(principal, path, auth_identity=None):142    success = False143    if (principal is not None) and (len(principal) > 0):144      if (auth_identity is None) or (len(auth_identity) == 0):145        norandkey = '-norandkey'146      else:147        norandkey = ''148      if (path is not None) and (len(path) > 0):149        keytab_file = '-k %s' % path150      else:151        keytab_file = ''152      try:153        result_code, output = KerberosScript.invoke_kadmin(154          'ktadd %s %s %s' % (keytab_file, norandkey, principal),155          auth_identity)156        success = (result_code == 0)157      except:158        raise Fail("Failed to create keytab for principal: %s (in %s)" % (principal, path))159    return success160  @staticmethod161  def create_keytab(principal, auth_identity=None):162    keytab = None163    (fd, temp_path) = tempfile.mkstemp()164    os.remove(temp_path)165    try:166      if KerberosScript.create_keytab_file(principal, temp_path, auth_identity):167        with open(temp_path, 'r') as f:168          keytab = base64.b64encode(f.read())169    finally:170      if os.path.isfile(temp_path):171        os.remove(temp_path)172    return keytab173  @staticmethod174  def principal_exists(identity, auth_identity=None):175    exists = False176    if identity is not None:177      principal = get_property_value(identity, 'principal')178      if (principal is not None) and (len(principal) > 0):179        try:180          result_code, output = KerberosScript.invoke_kadmin('getprinc %s' % principal,181                                                             auth_identity)182          exists = (output is not None) and (("Principal: %s" % principal) in output)183        except:184          raise Fail("Failed to determine if principal exists: %s" % principal)185    return exists186  @staticmethod187  def change_principal_password(identity, auth_identity=None):188    success = False189    if identity is not None:190      principal = get_property_value(identity, 'principal')191      if (principal is not None) and (len(principal) > 0):192        password = get_property_value(identity, 'password')193        if password is None:194          credentials = '-randkey'195        else:196          credentials = '-pw "%s"' % password197        try:198          result_code, output = KerberosScript.invoke_kadmin(199            'change_password %s %s' % (credentials, principal),200            auth_identity)201          success = (result_code == 0)202        except:203          raise Fail("Failed to create principal: %s" % principal)204    return success205  @staticmethod206  def create_principal(identity, auth_identity=None):207    success = False208    if identity is not None:209      principal = get_property_value(identity, 'principal')210      if (principal is not None) and (len(principal) > 0):211        password = get_property_value(identity, 'password')212        if password is None:213          credentials = '-randkey'214        else:215          credentials = '-pw "%s"' % password216        try:217          result_code, out = KerberosScript.invoke_kadmin(218            'addprinc %s %s' % (credentials, principal),219            auth_identity)220          success = (result_code == 0)221        except:222          raise Fail("Failed to create principal: %s" % principal)223    return success224  @staticmethod225  def clear_tmp_cache():226    tmp_dir = Constants.AGENT_TMP_DIR227    if tmp_dir is None:228      tmp_dir = gettempdir()229    curl_krb_cache_path = os.path.join(tmp_dir, "curl_krb_cache")230    Directory(curl_krb_cache_path, action="delete")231  @staticmethod232  def create_principals(identities, auth_identity=None):233    if identities is not None:234      for identity in identities:235        KerberosScript.create_principal(identity, auth_identity)236  @staticmethod237  def create_or_update_administrator_identity():238    import params239    if params.realm is not None:240      admin_identity = params.get_property_value(params.realm, 'admin_identity')241      if KerberosScript.principal_exists(admin_identity):242        KerberosScript.change_principal_password(admin_identity)243      else:244        KerberosScript.create_principal(admin_identity)245  @staticmethod246  def test_kinit(identity, user="root"):247    principal = get_property_value(identity, 'principal')248    kinit_path_local = functions.get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))249    kdestroy_path_local = functions.get_kdestroy_path(default('/configurations/kerberos-env/executable_search_paths', None))250    if principal is not None:251      keytab_file = get_property_value(identity, 'keytab_file')252      keytab = get_property_value(identity, 'keytab')253      password = get_property_value(identity, 'password')254      # If a test keytab file is available, simply use it255      if (keytab_file is not None) and (os.path.isfile(keytab_file)):256        keytab_file = keytab_file.replace("_HOST", params.hostname)257        command = '%s -k -t %s %s' % (kinit_path_local, keytab_file, principal)258        Execute(command,259          user = user,260        )261        return shell.checked_call(kdestroy_path_local)262      # If base64-encoded test keytab data is available; then decode it, write it to a temporary file263      # use it, and then remove the temporary file264      elif keytab is not None:265        (fd, test_keytab_file) = tempfile.mkstemp()266        os.write(fd, base64.b64decode(keytab))267        os.close(fd)268        try:269          command = '%s -k -t %s %s' % (kinit_path_local, test_keytab_file, principal)270          Execute(command,271            user = user,272          )273          return shell.checked_call(kdestroy_path_local)274        except:275          raise276        finally:277          if test_keytab_file is not None:278            os.remove(test_keytab_file)279      # If no keytab data is available and a password was supplied, simply use it.280      elif password is not None:281        process = subprocess.Popen([kinit_path_local, principal], stdin=subprocess.PIPE)282        stdout, stderr = process.communicate(password)283        if process.returncode:284          err_msg = Logger.filter_text("Execution of kinit returned %d. %s" % (process.returncode, stderr))285          raise Fail(err_msg)286        else:287          return shell.checked_call(kdestroy_path_local)288      else:289        return 0, ''290    else:291      return 0, ''292  def write_keytab_file(self):293    import params294    import stat295    if params.kerberos_command_params is not None:296      for item  in params.kerberos_command_params:297        keytab_content_base64 = get_property_value(item, 'keytab_content_base64')298        if (keytab_content_base64 is not None) and (len(keytab_content_base64) > 0):299          keytab_file_path = get_property_value(item, 'keytab_file_path')300          if (keytab_file_path is not None) and (len(keytab_file_path) > 0):301            keytab_file_path = keytab_file_path.replace("_HOST", params.hostname)302            head, tail = os.path.split(keytab_file_path)303            if head:304              Directory(head, create_parents = True, mode=0755, owner="root", group="root")305            owner = "root"306            group = "root"307            mode = 0308            mode |= stat.S_IREAD | stat.S_IWRITE309            mode |= stat.S_IRGRP | stat.S_IWGRP310            keytab_content = base64.b64decode(keytab_content_base64)311            # to hide content in command output312            def make_lambda(data):313              return lambda: data314            File(keytab_file_path,315                 content=make_lambda(keytab_content),316                 mode=mode,317                 owner=owner,318                 group=group)319            principal = get_property_value(item, 'principal')320            if principal is not None:321              curr_content = Script.structuredOut322              if "keytabs" not in curr_content:323                curr_content['keytabs'] = {}324              curr_content['keytabs'][principal.replace("_HOST", params.hostname)] = keytab_file_path325              self.put_structured_out(curr_content)326  def delete_keytab_file(self):327    import params328    if params.kerberos_command_params is not None:329      for item in params.kerberos_command_params:330        keytab_file_path = get_property_value(item, 'keytab_file_path')331        if (keytab_file_path is not None) and (len(keytab_file_path) > 0):332          keytab_file_path = keytab_file_path.replace("_HOST", params.hostname)333          # Delete the keytab file334          File(keytab_file_path, action="delete")335          principal = get_property_value(item, 'principal')336          if principal is not None:337            curr_content = Script.structuredOut338            if "keytabs" not in curr_content:339              curr_content['keytabs'] = {}340            curr_content['keytabs'][principal.replace("_HOST", params.hostname)] = '_REMOVED_'...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
