Best Python code snippet using Testify_python
test_network.py
Source:test_network.py  
1#    Licensed under the Apache License, Version 2.0 (the "License"); you may2#    not use this file except in compliance with the License. You may obtain3#    a copy of the License at4#5#         http://www.apache.org/licenses/LICENSE-2.06#7#    Unless required by applicable law or agreed to in writing, software8#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT9#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the10#    License for the specific language governing permissions and limitations11#    under the License.12import json13import uuid14from openstackclient.tests.functional import base15class NetworkTests(base.TestCase):16    """Functional tests for network"""17    def test_network_create(self):18        """Test create options, delete"""19        # Get project IDs20        cmd_output = json.loads(self.openstack('token issue -f json '))21        auth_project_id = cmd_output['project_id']22        cmd_output = json.loads(self.openstack('project list -f json '))23        admin_project_id = None24        demo_project_id = None25        for p in cmd_output:26            if p['Name'] == 'admin':27                admin_project_id = p['ID']28            if p['Name'] == 'demo':29                demo_project_id = p['ID']30        # Verify assumptions:31        # * admin and demo projects are present32        # * demo and admin are distinct projects33        # * tests run as admin34        self.assertIsNotNone(admin_project_id)35        self.assertIsNotNone(demo_project_id)36        self.assertNotEqual(admin_project_id, demo_project_id)37        self.assertEqual(admin_project_id, auth_project_id)38        # network create with no options39        name1 = uuid.uuid4().hex40        cmd_output = json.loads(self.openstack(41            'network create -f json ' +42            name143        ))44        self.addCleanup(self.openstack, 'network delete ' + name1)45        self.assertIsNotNone(cmd_output["id"])46        # Check the default values47        self.assertEqual(48            admin_project_id,49            cmd_output["project_id"],50        )51        self.assertEqual(52            '',53            cmd_output["description"],54        )55        self.assertEqual(56            'UP',57            cmd_output["admin_state_up"],58        )59        self.assertEqual(60            False,61            cmd_output["shared"],62        )63        self.assertEqual(64            'Internal',65            cmd_output["router:external"],66        )67        name2 = uuid.uuid4().hex68        cmd_output = json.loads(self.openstack(69            'network create -f json ' +70            '--project demo ' +71            name272        ))73        self.addCleanup(self.openstack, 'network delete ' + name2)74        self.assertIsNotNone(cmd_output["id"])75        self.assertEqual(76            demo_project_id,77            cmd_output["project_id"],78        )79        self.assertEqual(80            '',81            cmd_output["description"],82        )83    def test_network_delete(self):84        """Test create, delete multiple"""85        name1 = uuid.uuid4().hex86        cmd_output = json.loads(self.openstack(87            'network create -f json ' +88            '--description aaaa ' +89            name190        ))91        self.assertIsNotNone(cmd_output["id"])92        self.assertEqual(93            'aaaa',94            cmd_output["description"],95        )96        name2 = uuid.uuid4().hex97        cmd_output = json.loads(self.openstack(98            'network create -f json ' +99            '--description bbbb ' +100            name2101        ))102        self.assertIsNotNone(cmd_output["id"])103        self.assertEqual(104            'bbbb',105            cmd_output["description"],106        )107        del_output = self.openstack('network delete ' + name1 + ' ' + name2)108        self.assertOutput('', del_output)109    def test_network_list(self):110        """Test create defaults, list filters, delete"""111        name1 = uuid.uuid4().hex112        cmd_output = json.loads(self.openstack(113            'network create -f json ' +114            '--description aaaa ' +115            name1116        ))117        self.addCleanup(self.openstack, 'network delete ' + name1)118        self.assertIsNotNone(cmd_output["id"])119        self.assertEqual(120            'aaaa',121            cmd_output["description"],122        )123        # Check the default values124        self.assertEqual(125            'UP',126            cmd_output["admin_state_up"],127        )128        self.assertEqual(129            False,130            cmd_output["shared"],131        )132        self.assertEqual(133            'Internal',134            cmd_output["router:external"],135        )136        # NOTE(dtroyer): is_default is not present in the create output137        #                so make sure it stays that way.138        # NOTE(stevemar): is_default *is* present in SDK 0.9.11 and newer,139        #                 but the value seems to always be None, regardless140        #                 of the --default or --no-default value.141        # self.assertEqual('x', cmd_output)142        if ('is_default' in cmd_output):143            self.assertEqual(144                None,145                cmd_output["is_default"],146            )147        self.assertEqual(148            True,149            cmd_output["port_security_enabled"],150        )151        name2 = uuid.uuid4().hex152        cmd_output = json.loads(self.openstack(153            'network create -f json ' +154            '--description bbbb ' +155            '--disable ' +156            '--share ' +157            name2158        ))159        self.addCleanup(self.openstack, 'network delete ' + name2)160        self.assertIsNotNone(cmd_output["id"])161        self.assertEqual(162            'bbbb',163            cmd_output["description"],164        )165        self.assertEqual(166            'DOWN',167            cmd_output["admin_state_up"],168        )169        self.assertEqual(170            True,171            cmd_output["shared"],172        )173        if ('is_default' in cmd_output):174            self.assertEqual(175                None,176                cmd_output["is_default"],177            )178        self.assertEqual(179            True,180            cmd_output["port_security_enabled"],181        )182        # Test list --long183        cmd_output = json.loads(self.openstack(184            "network list -f json " +185            "--long"186        ))187        col_name = [x["Name"] for x in cmd_output]188        self.assertIn(name1, col_name)189        self.assertIn(name2, col_name)190        # Test list --long --enable191        cmd_output = json.loads(self.openstack(192            "network list -f json " +193            "--enable " +194            "--long"195        ))196        col_name = [x["Name"] for x in cmd_output]197        self.assertIn(name1, col_name)198        self.assertNotIn(name2, col_name)199        # Test list --long --disable200        cmd_output = json.loads(self.openstack(201            "network list -f json " +202            "--disable " +203            "--long"204        ))205        col_name = [x["Name"] for x in cmd_output]206        self.assertNotIn(name1, col_name)207        self.assertIn(name2, col_name)208        # Test list --long --share209        cmd_output = json.loads(self.openstack(210            "network list -f json " +211            "--share " +212            "--long"213        ))214        col_name = [x["Name"] for x in cmd_output]215        self.assertNotIn(name1, col_name)216        self.assertIn(name2, col_name)217        # Test list --long --no-share218        cmd_output = json.loads(self.openstack(219            "network list -f json " +220            "--no-share " +221            "--long"222        ))223        col_name = [x["Name"] for x in cmd_output]224        self.assertIn(name1, col_name)225        self.assertNotIn(name2, col_name)226    def test_network_set(self):227        """Tests create options, set, show, delete"""228        name = uuid.uuid4().hex229        cmd_output = json.loads(self.openstack(230            'network create -f json ' +231            '--description aaaa ' +232            '--enable ' +233            '--no-share ' +234            '--internal ' +235            '--no-default ' +236            '--enable-port-security ' +237            name238        ))239        self.addCleanup(self.openstack, 'network delete ' + name)240        self.assertIsNotNone(cmd_output["id"])241        self.assertEqual(242            'aaaa',243            cmd_output["description"],244        )245        self.assertEqual(246            'UP',247            cmd_output["admin_state_up"],248        )249        self.assertEqual(250            False,251            cmd_output["shared"],252        )253        self.assertEqual(254            'Internal',255            cmd_output["router:external"],256        )257        # NOTE(dtroyer): is_default is not present in the create output258        #                so make sure it stays that way.259        # NOTE(stevemar): is_default *is* present in SDK 0.9.11 and newer,260        #                 but the value seems to always be None, regardless261        #                 of the --default or --no-default value.262        if ('is_default' in cmd_output):263            self.assertEqual(264                None,265                cmd_output["is_default"],266            )267        self.assertEqual(268            True,269            cmd_output["port_security_enabled"],270        )271        raw_output = self.openstack(272            'network set ' +273            '--description cccc ' +274            '--disable ' +275            '--share ' +276            '--external ' +277            '--disable-port-security ' +278            name279        )280        self.assertOutput('', raw_output)281        cmd_output = json.loads(self.openstack(282            'network show -f json ' + name283        ))284        self.assertEqual(285            'cccc',286            cmd_output["description"],287        )288        self.assertEqual(289            'DOWN',290            cmd_output["admin_state_up"],291        )292        self.assertEqual(293            True,294            cmd_output["shared"],295        )296        self.assertEqual(297            'External',298            cmd_output["router:external"],299        )300        # why not 'None' like above??301        self.assertEqual(302            False,303            cmd_output["is_default"],304        )305        self.assertEqual(306            False,307            cmd_output["port_security_enabled"],308        )309        # NOTE(dtroyer): There is ambiguity around is_default in that310        #                it is not in the API docs and apparently can311        #                not be set when the network is --external,312        #                although the option handling code only looks at313        #                the value of is_default when external is True.314        raw_output = self.openstack(315            'network set ' +316            '--default ' +317            name318        )319        self.assertOutput('', raw_output)320        cmd_output = json.loads(self.openstack(321            'network show -f json ' + name322        ))323        self.assertEqual(324            'cccc',325            cmd_output["description"],326        )327        # NOTE(dtroyer): This should be 'True'328        self.assertEqual(329            False,330            cmd_output["port_security_enabled"],...smbtouch.py
Source:smbtouch.py  
12import ops.cmd3import util.mac4import dsz5from scanengine2 import scan6import os.path7import re8from xml.etree.ElementTree import ElementTree9from xml.etree.ElementTree import Element1011def _whats_your_job():12    return 'smbtouch\\|*'1314def _whats_your_name():15    return 'smbtouch'1617def _support_ipv6():18    return False1920class smbtouch(scan, ):2122    def __init__(self, job, timeout=60):23        scan.__init__(self, job)24        if (len(job) > 1):25            self.port = job[0].split('|')[1]26        self.scan_type = _whats_your_name()27        self.timeout = timeout2829    def execute_scan(self, verbose):30        redir_cmd = scan.gettunnel(self, self.target, 'tcp', self.port)31        PATH_TO_SMBTOUCH = scan.find_newest_touch(self, 'Smbtouch', 'exe')32        PATH_TO_SMBXML = scan.find_newest_touch(self, 'Smbtouch', 'xml')33        smbcmd = ops.cmd.getDszCommand('run', dszquiet=(not verbose))34        smb_cmd_list = []35        smb_cmd_list.append(('--InConfig %s' % PATH_TO_SMBXML))36        smb_cmd_list.append(('--TargetIp %s' % '127.0.0.1'))37        smb_cmd_list.append(('--TargetPort %s' % redir_cmd.lplisten))38        smb_cmd_list.append(('--NetworkTimeout %s' % self.timeout))39        if (int(self.port) == 445):40            smb_cmd_list.append(('--Protocol %s' % 'SMB'))41        elif (int(self.port) == 139):42            smb_cmd_list.append(('--Protocol %s' % 'NBT'))43        smb_cmd_list.append(('--Credentials %s' % 'Anonymous'))44        outconfig = os.path.join(ops.LOGDIR, 'Logs', ('%s_%s_%s.xml' % (os.path.basename(PATH_TO_SMBTOUCH), self.target, dsz.Timestamp())))45        smb_cmd_list.append(('--OutConfig %s' % outconfig))46        smb_cmd_string = ((PATH_TO_SMBTOUCH + ' ') + ' '.join(smb_cmd_list))47        smbcmd.command = ('cmd /C %s' % smb_cmd_string)48        smbcmd.arglist.append('-redirect')49        smbcmd.arglist.append(('-directory %s' % os.path.join(ops.DSZDISKSDIR, 'lib', 'x86-Windows')))50        smbcmd.prefixes.append('local')51        smbcmd.prefixes.append('log')52        smbobject = smbcmd.execute()53        ops.networking.redirect.stop_tunnel(dsz_cmd=redir_cmd)54        cmd_output = {}55        cmd_output['error'] = None56        screenlog = os.path.join(ops.PROJECTLOGDIR, smbobject.commandmetadata.screenlog)57        f = open(screenlog, 'r')58        screenlog_lines = f.readlines()59        f.close()60        vulnerable = []61        not_vulnerable = []62        not_supported = []63        for line in screenlog_lines:64            re_out = re.search('Error 0x', line)65            if (re_out is not None):66                cmd_output['error'] = line.split('-')[(-1)].strip()67            re_out = re.search('ETERNAL', line)68            if (re_out is not None):69                line_split = line.split('-')70                exploit = line_split[0].strip()71                if (exploit == 'ETERNALBLUE'):72                    exploit = 'ETEB'73                elif (exploit == 'ETERNALROMANCE'):74                    exploit = 'ETRO'75                elif (exploit == 'ETERNALCHAMPION'):76                    exploit = 'ETCH'77                elif (exploit == 'ETERNALSYNERGY'):78                    exploit = 'ETSY'79                if ((re.search('FB', line) is not None) or (re.search('DANE', line) is not None)):80                    vulnerable.append(exploit)81                elif (re.search('not supported', line) is not None):82                    not_supported.append(exploit)83                else:84                    not_vulnerable.append(exploit)85        self.vulnerable = ','.join(vulnerable)86        self.not_vulnerable = ','.join(not_vulnerable)87        self.not_supported = ','.join(not_supported)88        if (cmd_output['error'] is None):89            tree = ElementTree()90            tree.parse(outconfig)91            root = tree.getroot()92            outparams = root.find('{urn:trch}outputparameters').getchildren()93            for ele in outparams:94                try:95                    cmd_output[ele.get('name')] = ele.find('{urn:trch}value').text96                except:97                    continue98        if ('Target' in cmd_output.keys()):99            self.os = cmd_output['Target']100        if ('TargetOsArchitecture' in cmd_output.keys()):101            self.arch = cmd_output['TargetOsArchitecture']102        if ('PipeName' in cmd_output.keys()):103            self.pipe = cmd_output['PipeName']104        if ('ShareName' in cmd_output.keys()):105            self.share = cmd_output['ShareName']106        if ('Credentials' in cmd_output.keys()):107            self.credentials = cmd_output['Credentials']108        self.error = cmd_output['error']109        self.timestamp = dsz.Timestamp()110        if ((cmd_output['error'] is None) or (not (cmd_output['error'] == 'ErrorConnectionTimedOut'))):111            self.success = True112113    def return_success_message(self):114        return ('SMBtouch response for %s' % self.target)115116    def verify_escalation(self, escalation_rule):117        smbtouch = self118        try:119            eval_res = eval(escalation_rule)120            if ((eval_res == True) or (eval_res == False)):121                return True122            else:123                return False124        except:125            return False126127    def check_escalation(self, escalation_rule):128        smbtouch = self129        try:130            if eval(escalation_rule):131                return True132            else:133                return False134        except:135            return False136137    def return_data(self):138        return scan.return_data(self)139140    def get_display_headers(self):141        return ['Targeted Address', 'Port', 'OS', 'Arch', 'Creds', 'Pipe', 'Error', 'Vulnerable', 'Not Vulnerable', 'Not Supported', 'Time Stamp']142143    def get_data_fields(self):144        return ['target', 'port', 'os', 'arch', 'credentials', 'pipe', 'error', 'vulnerable', 'not_vulnerable', 'not_supported', 'timestamp']145146    def get_raw_fields(self):147        return (self.get_data_fields() + ['success'])148149    def verify_job(self, job):150        if ((not (len(job) == 2)) or (not (int(job[1]) in [139, 445]))):151            return False152        return True153154    def min_time(self):155        return 30156157    def min_range(self):
...smart_cli.py
Source:smart_cli.py  
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3# Copyright (c) 2016, Intel Corporation. All rights reserved.4# This file is licensed under the GPLv2 license.5# For the full content of this license, see the LICENSE.txt6# file at the top level of this source tree.7import subprocess8import os9from .. import exception10from tools import logging_helper, shell_ops11import manage_config12class REPO_HANDLER(object):13    def __init__(self):14        self.__error = None15        self.__repo_number = 016        self.__result = dict()17        self.__response = dict()18        self.__log_helper = logging_helper.logging_helper.Logger()19    def add(self, repo_urls):20        self.__response['status'] = "failure"21        for url in repo_urls:22            self.__repo_number += 123            repo_name = manage_config.flex_repo_name + str(self.__repo_number)24            self.__log_helper.logger.debug(str(repo_name) + " " + str(url))25            command = "smart channel --add '" + repo_name + "' type=rpm-md baseurl=" + url + " -y"26            result = shell_ops.run_cmd_chk(command)27            if result['returncode']:28                if "error" in result['cmd_output']:29                    self.action(repo_name, 'remove', 'WR')30                    self.__error = "Failed to add repository: " + result['cmd_output'][result['cmd_output'].index("error:") + 7:].replace("\n", "")31                else:32                    self.__error = "Error adding update repository"33                self.__response['message'] = self.__error34                self.__log_helper.logger.error("Failed to add repository. Error output: '%s'" % self.__response['message'])35                return self.__response36            else:37                command = "smart update '" + repo_name + "'"38                result = shell_ops.run_cmd_chk(command)  # Attempt to connect to new repo39                if result['returncode']:  # If attempt fails determine error and remove repo40                    # Sometimes, some special repo URL will not be removed from the cache file when the repo is removed. (repo add and then repo remove)41                    # If this happens, then "smart update this repo" will fail. So we want to manually delete the cache file.42                    if "OSError: [Errno 2] No such file or directory" in result['cmd_output']:43                        self.__log_helper.logger.debug("trying to remove cache file and retry...")44                        if os.path.isfile(manage_config.smart_cache_file):45                            self.__log_helper.logger.debug(manage_config.smart_cache_file + ' exists, so we will try to remove it.')46                            try:47                                os.remove(manage_config.smart_cache_file)48                                self.__log_helper.logger.debug("Try updating again............")49                                result = shell_ops.run_cmd_chk(command)  # Attempt to connect to new repo50                                if not result['returncode']:  # If it works51                                    continue  # go to the next loop52                            except Exception as e:53                                self.__log_helper.logger.error(manage_config.smart_cache_file + ' cannot be removed. ' + str(e))54                    if "error" in result['cmd_output']:55                        if "Invalid URL" in result['cmd_output']:56                            self.__response['message'] = "Failed to add repository: Invalid URL."57                        elif "Failed to connect" in result['cmd_output'] or 'URL returned error: 503' in result['cmd_output']:58                            self.__response['message'] = "Failed to add repository: Unable to connect to repository."59                        elif "Invalid XML" in result['cmd_output']:60                            self.__response['message'] = "Failed to add repository: Repository XML file invalid. "61                        else:62                            self.__response['message'] = result['cmd_output'][result['cmd_output'].index("error:") + 7:].replace("\n", "")63                    else:64                        self.__response['message'] = 'Error adding repository: ' + str(result['cmd_output'])65                    self.action(repo_name, 'remove', 'WR')66                    self.__log_helper.logger.error("Failed to add repository. Error output: '%s'" % self.__response['message'])67                    return self.__response68        self.__response['status'] = "success"69        self.__log_helper.logger.debug(str(self.__response))70        return self.__response71    def action(self, m_repo, action, type):72        cmd_output = dict()73        for repo in m_repo:74            if type == 'not_WR' and manage_config.flex_repo_name not in repo:75                self.__log_helper.logger.debug(str(action) + " " + str(repo))76                cmd_output = shell_ops.run_cmd_chk("smart channel --" + action + " " + repo + " -y")77                if cmd_output['returncode']:78                    return cmd_output79            if type == 'WR' and manage_config.flex_repo_name in repo:80                self.__log_helper.logger.debug(str(action) + " " + str(repo))81                cmd_output = shell_ops.run_cmd_chk("smart channel --" + action + " " + repo + " -y")82                if cmd_output['returncode']:83                    return cmd_output84        if action == 'remove':85            cmd_output = shell_ops.run_cmd_chk("smart update")86        cmd_output['returncode'] = 0...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
