Best Python code snippet using tempest_python
test_host.py
Source:test_host.py  
1"""2    :codeauthor: Rahul Handay <rahulha@saltstack.com>3"""4import logging5import pytest6import salt.states.host as host7from tests.support.mock import MagicMock, call, patch8log = logging.getLogger(__name__)9hostname = "salt"10localhost_ip = "127.0.0.1"11ip_list = ["203.0.113.113", "203.0.113.14"]12default_hosts = {13    ip_list[0]: {"aliases": [hostname]},14    ip_list[1]: {"aliases": [hostname]},15}16@pytest.fixture17def configure_loader_modules():18    return {19        host: {"__opts__": {"test": False}},20    }21def test_present():22    """23    Test to ensures that the named host is present with the given ip24    """25    add_host = MagicMock(return_value=True)26    rm_host = MagicMock(return_value=True)27    hostname = "salt"28    ip_str = "127.0.0.1"29    ip_list = ["10.1.2.3", "10.4.5.6"]30    # Case 1: No match for hostname. Single IP address passed to the state.31    list_hosts = MagicMock(return_value={"127.0.0.1": {"aliases": ["localhost"]}})32    with patch.dict(33        host.__salt__,34        {35            "hosts.list_hosts": list_hosts,36            "hosts.add_host": add_host,37            "hosts.rm_host": rm_host,38        },39    ):40        ret = host.present(hostname, ip_str)41        assert ret["result"] is True42        assert ret["comment"] == "Added host {} ({})".format(hostname, ip_str), ret[43            "comment"44        ]45        assert ret["changes"] == {"added": {ip_str: [hostname]}}, ret["changes"]46        expected = [call(ip_str, hostname)]47        assert add_host.mock_calls == expected, add_host.mock_calls48        assert rm_host.mock_calls == [], rm_host.mock_calls49    # Case 2: No match for hostname. Multiple IP addresses passed to the50    # state.51    list_hosts = MagicMock(return_value={"127.0.0.1": {"aliases": ["localhost"]}})52    add_host.reset_mock()53    rm_host.reset_mock()54    with patch.dict(55        host.__salt__,56        {57            "hosts.list_hosts": list_hosts,58            "hosts.add_host": add_host,59            "hosts.rm_host": rm_host,60        },61    ):62        ret = host.present(hostname, ip_list)63        assert ret["result"] is True64        assert "Added host {} ({})".format(hostname, ip_list[0]) in ret["comment"]65        assert "Added host {} ({})".format(hostname, ip_list[1]) in ret["comment"]66        assert ret["changes"] == {67            "added": {ip_list[0]: [hostname], ip_list[1]: [hostname]}68        }, ret["changes"]69        expected = sorted([call(x, hostname) for x in ip_list])70        assert sorted(add_host.mock_calls) == expected, add_host.mock_calls71        assert rm_host.mock_calls == [], rm_host.mock_calls72    # Case 3: Match for hostname, but no matching IP. Single IP address73    # passed to the state.74    list_hosts = MagicMock(75        return_value={76            "127.0.0.1": {"aliases": ["localhost"]},77            ip_list[0]: {"aliases": [hostname]},78        }79    )80    add_host.reset_mock()81    rm_host.reset_mock()82    with patch.dict(83        host.__salt__,84        {85            "hosts.list_hosts": list_hosts,86            "hosts.add_host": add_host,87            "hosts.rm_host": rm_host,88        },89    ):90        ret = host.present(hostname, ip_str)91        assert ret["result"] is True92        assert "Added host {} ({})".format(hostname, ip_str) in ret["comment"]93        assert (94            "Host {} present for IP address {}".format(hostname, ip_list[0])95            in ret["warnings"][0]96        )97        assert ret["changes"] == {"added": {ip_str: [hostname]}}, ret["changes"]98        expected = [call(ip_str, hostname)]99        assert add_host.mock_calls == expected, add_host.mock_calls100        assert rm_host.mock_calls == [], rm_host.mock_calls101    # Case 3a: Repeat the above with clean=True102    add_host.reset_mock()103    rm_host.reset_mock()104    with patch.dict(105        host.__salt__,106        {107            "hosts.list_hosts": list_hosts,108            "hosts.add_host": add_host,109            "hosts.rm_host": rm_host,110        },111    ):112        ret = host.present(hostname, ip_str, clean=True)113        assert ret["result"] is True114        assert "Added host {} ({})".format(hostname, ip_str) in ret["comment"]115        assert "Removed host {} ({})".format(hostname, ip_list[0]) in ret["comment"]116        assert ret["changes"] == {117            "added": {ip_str: [hostname]},118            "removed": {ip_list[0]: [hostname]},119        }, ret["changes"]120        expected = [call(ip_str, hostname)]121        assert add_host.mock_calls == expected, add_host.mock_calls122        expected = [call(ip_list[0], hostname)]123        assert rm_host.mock_calls == expected, rm_host.mock_calls124    # Case 4: Match for hostname, but no matching IP. Multiple IP addresses125    # passed to the state.126    cur_ip = "1.2.3.4"127    list_hosts = MagicMock(128        return_value={129            "127.0.0.1": {"aliases": ["localhost"]},130            cur_ip: {"aliases": [hostname]},131        }132    )133    add_host.reset_mock()134    rm_host.reset_mock()135    with patch.dict(136        host.__salt__,137        {138            "hosts.list_hosts": list_hosts,139            "hosts.add_host": add_host,140            "hosts.rm_host": rm_host,141        },142    ):143        ret = host.present(hostname, ip_list)144        assert ret["result"] is True145        assert "Added host {} ({})".format(hostname, ip_list[0]) in ret["comment"]146        assert "Added host {} ({})".format(hostname, ip_list[1]) in ret["comment"]147        assert ret["changes"] == {148            "added": {ip_list[0]: [hostname], ip_list[1]: [hostname]},149        }, ret["changes"]150        expected = sorted([call(x, hostname) for x in ip_list])151        assert sorted(add_host.mock_calls) == expected, add_host.mock_calls152        assert rm_host.mock_calls == [], rm_host.mock_calls153    # Case 4a: Repeat the above with clean=True154    add_host.reset_mock()155    rm_host.reset_mock()156    with patch.dict(157        host.__salt__,158        {159            "hosts.list_hosts": list_hosts,160            "hosts.add_host": add_host,161            "hosts.rm_host": rm_host,162        },163    ):164        ret = host.present(hostname, ip_list, clean=True)165        assert ret["result"] is True166        assert "Added host {} ({})".format(hostname, ip_list[0]) in ret["comment"]167        assert "Added host {} ({})".format(hostname, ip_list[1]) in ret["comment"]168        assert "Removed host {} ({})".format(hostname, cur_ip) in ret["comment"]169        assert ret["changes"] == {170            "added": {ip_list[0]: [hostname], ip_list[1]: [hostname]},171            "removed": {cur_ip: [hostname]},172        }, ret["changes"]173        expected = sorted([call(x, hostname) for x in ip_list])174        assert sorted(add_host.mock_calls) == expected, add_host.mock_calls175        expected = [call(cur_ip, hostname)]176        assert rm_host.mock_calls == expected, rm_host.mock_calls177    # Case 5: Multiple IP addresses passed to the state. One of them178    # matches, the other does not. There is also a non-matching IP that179    # must be removed.180    cur_ip = "1.2.3.4"181    list_hosts = MagicMock(182        return_value={183            "127.0.0.1": {"aliases": ["localhost"]},184            cur_ip: {"aliases": [hostname]},185            ip_list[0]: {"aliases": [hostname]},186        }187    )188    add_host.reset_mock()189    rm_host.reset_mock()190    with patch.dict(191        host.__salt__,192        {193            "hosts.list_hosts": list_hosts,194            "hosts.add_host": add_host,195            "hosts.rm_host": rm_host,196        },197    ):198        ret = host.present(hostname, ip_list)199        assert ret["result"] is True200        assert "Added host {} ({})".format(hostname, ip_list[1]) in ret["comment"]201        assert ret["changes"] == {"added": {ip_list[1]: [hostname]}}, ret["changes"]202        expected = [call(ip_list[1], hostname)]203        assert add_host.mock_calls == expected, add_host.mock_calls204        assert rm_host.mock_calls == [], rm_host.mock_calls205    # Case 5a: Repeat the above with clean=True206    add_host.reset_mock()207    rm_host.reset_mock()208    with patch.dict(209        host.__salt__,210        {211            "hosts.list_hosts": list_hosts,212            "hosts.add_host": add_host,213            "hosts.rm_host": rm_host,214        },215    ):216        ret = host.present(hostname, ip_list, clean=True)217        assert ret["result"] is True218        assert "Added host {} ({})".format(hostname, ip_list[1]) in ret["comment"]219        assert "Removed host {} ({})".format(hostname, cur_ip) in ret["comment"]220        assert ret["changes"] == {221            "added": {ip_list[1]: [hostname]},222            "removed": {cur_ip: [hostname]},223        }, ret["changes"]224        expected = [call(ip_list[1], hostname)]225        assert add_host.mock_calls == expected, add_host.mock_calls226        expected = [call(cur_ip, hostname)]227        assert rm_host.mock_calls == expected, rm_host.mock_calls228    # Case 6: Single IP address passed to the state, which matches the229    # current configuration for that hostname. No changes should be made.230    list_hosts = MagicMock(return_value={ip_str: {"aliases": [hostname]}})231    add_host.reset_mock()232    rm_host.reset_mock()233    with patch.dict(234        host.__salt__,235        {236            "hosts.list_hosts": list_hosts,237            "hosts.add_host": add_host,238            "hosts.rm_host": rm_host,239        },240    ):241        ret = host.present(hostname, ip_str)242        assert ret["result"] is True243        assert (244            ret["comment"]245            == "Host {} ({}) already present".format(hostname, ip_str)246            in ret["comment"]247        )248        assert ret["changes"] == {}, ret["changes"]249        assert add_host.mock_calls == [], add_host.mock_calls250        assert rm_host.mock_calls == [], rm_host.mock_calls251    # Case 7: Multiple IP addresses passed to the state, which both match252    # the current configuration for that hostname. No changes should be253    # made.254    list_hosts = MagicMock(255        return_value={256            ip_list[0]: {"aliases": [hostname]},257            ip_list[1]: {"aliases": [hostname]},258        }259    )260    add_host.reset_mock()261    rm_host.reset_mock()262    with patch.dict(263        host.__salt__,264        {265            "hosts.list_hosts": list_hosts,266            "hosts.add_host": add_host,267            "hosts.rm_host": rm_host,268        },269    ):270        ret = host.present(hostname, ip_list)271        assert ret["result"] is True272        assert (273            "Host {} ({}) already present".format(hostname, ip_list[0])274            in ret["comment"]275        )276        assert (277            "Host {} ({}) already present".format(hostname, ip_list[1])278            in ret["comment"]279        )280        assert ret["changes"] == {}, ret["changes"]281        assert add_host.mock_calls == [], add_host.mock_calls282        assert rm_host.mock_calls == [], rm_host.mock_calls283    # Case 8: Passing a comment to host.present with multiple IPs284    list_hosts = MagicMock(return_value={"127.0.0.1": {"aliases": ["localhost"]}})285    add_host_mock = MagicMock(return_value=True)286    rm_host_mock = MagicMock(return_value=True)287    set_comment_mock = MagicMock(return_value=True)288    add_host_mock.reset_mock()289    rm_host_mock.reset_mock()290    set_comment_mock.reset_mock()291    with patch.dict(292        host.__salt__,293        {294            "hosts.list_hosts": list_hosts,295            "hosts.add_host": add_host_mock,296            "hosts.rm_host": rm_host_mock,297            "hosts.set_comment": set_comment_mock,298        },299    ):300        ret = host.present(hostname, ip_list, comment="A comment")301        assert ret["result"] is True302        assert "Added host {} ({})".format(hostname, ip_list[0]) in ret["comment"]303        assert "Added host {} ({})".format(hostname, ip_list[1]) in ret["comment"]304        assert (305            "Set comment for host {} (A comment)".format(ip_list[0]) in ret["comment"]306        )307        assert (308            "Set comment for host {} (A comment)".format(ip_list[1]) in ret["comment"]309        )310        assert ret["changes"] == {311            "added": {ip_list[0]: [hostname], ip_list[1]: [hostname]},312            "comment_added": {ip_list[0]: ["A comment"], ip_list[1]: ["A comment"]},313        }, ret["changes"]314        expected = sorted([call(x, hostname) for x in ip_list])315        assert sorted(add_host_mock.mock_calls) == expected, add_host_mock.mock_calls316        expected = sorted([call(x, "A comment") for x in ip_list])317        assert (318            sorted(set_comment_mock.mock_calls) == expected319        ), set_comment_mock.mock_calls320        assert rm_host_mock.mock_calls == [], rm_host_mock.mock_calls321def test_host_present_should_return_True_if_test_and_no_changes():322    add_host_mock = MagicMock(return_value=True)323    rm_host_mock = MagicMock(return_value=True)324    expected = {325        "comment": "Host {} ({}) already present".format(326            hostname,327            ip_list[0],328        ),329        "changes": {},330        "name": hostname,331        "result": True,332    }333    list_hosts = MagicMock(334        return_value={ip_list[0]: {"aliases": [hostname]}},335    )336    with patch.dict(337        host.__salt__,338        {339            "hosts.list_hosts": list_hosts,340            "hosts.add_host": add_host_mock,341            "hosts.rm_host": rm_host_mock,342        },343    ):344        with patch.dict(host.__opts__, {"test": True}):345            ret = host.present(hostname, ip_list[:1])346            assert ret == expected347def test_host_present_should_return_None_if_test_and_adding():348    add_host_mock = MagicMock(return_value=True)349    rm_host_mock = MagicMock(return_value=True)350    expected = {351        "comment": "\n".join(352            ["Host {} ({}) already present", "Host {} ({}) would be added"]353        ).format(354            hostname,355            ip_list[0],356            hostname,357            ip_list[1],358        ),359        "changes": {"added": {ip_list[1]: [hostname]}},360        "name": hostname,361        "result": None,362    }363    list_hosts = MagicMock(364        return_value={ip_list[0]: {"aliases": [hostname]}},365    )366    with patch.dict(367        host.__salt__,368        {369            "hosts.list_hosts": list_hosts,370            "hosts.add_host": add_host_mock,371            "hosts.rm_host": rm_host_mock,372        },373    ):374        with patch.dict(host.__opts__, {"test": True}):375            ret = host.present(hostname, ip_list)376            assert ret == expected377def test_host_present_should_return_None_if_test_and_removing():378    add_host_mock = MagicMock(return_value=True)379    rm_host_mock = MagicMock(return_value=True)380    list_hosts_mock = MagicMock(return_value=default_hosts)381    expected = {382        "comment": "\n".join(383            ["Host {} ({}) already present", "Host {} ({}) would be removed"]384        ).format(385            hostname,386            ip_list[0],387            hostname,388            ip_list[1],389        ),390        "changes": {"removed": {ip_list[1]: [hostname]}},391        "name": hostname,392        "result": None,393    }394    with patch.dict(395        host.__salt__,396        {397            "hosts.list_hosts": list_hosts_mock,398            "hosts.add_host": add_host_mock,399            "hosts.rm_host": rm_host_mock,400        },401    ):402        with patch.dict(host.__opts__, {"test": True}):403            ret = host.present(hostname, ip_list[:1], clean=True)404            assert ret == expected405def test_absent():406    """407    Test to ensure that the named host is absent408    """409    ret = {410        "changes": {},411        "comment": "Host salt (127.0.0.1) already absent",412        "name": "salt",413        "result": True,414    }415    mock = MagicMock(return_value=False)416    with patch.dict(host.__salt__, {"hosts.has_pair": mock}):417        assert host.absent("salt", "127.0.0.1") == ret418def test_only_already():419    """420    Test only() when the state hasn't changed421    """422    expected = {423        "name": "127.0.1.1",424        "changes": {},425        "result": True,426        "comment": 'IP address 127.0.1.1 already set to "foo.bar"',427    }428    mock1 = MagicMock(return_value=["foo.bar"])429    with patch.dict(host.__salt__, {"hosts.get_alias": mock1}):430        mock2 = MagicMock(return_value=True)431        with patch.dict(host.__salt__, {"hosts.set_host": mock2}):432            with patch.dict(host.__opts__, {"test": False}):433                assert expected == host.only("127.0.1.1", "foo.bar")434def test_only_dryrun():435    """436    Test only() when state would change, but it's a dry run437    """438    expected = {439        "name": "127.0.1.1",440        "changes": {},441        "result": None,442        "comment": 'Would change 127.0.1.1 from "foo.bar" to "foo.bar foo"',443    }444    mock1 = MagicMock(return_value=["foo.bar"])445    with patch.dict(host.__salt__, {"hosts.get_alias": mock1}):446        mock2 = MagicMock(return_value=True)447        with patch.dict(host.__salt__, {"hosts.set_host": mock2}):448            with patch.dict(host.__opts__, {"test": True}):449                assert expected == host.only("127.0.1.1", ["foo.bar", "foo"])450def test_only_fail():451    """452    Test only() when state change fails453    """454    expected = {455        "name": "127.0.1.1",456        "changes": {},457        "result": False,458        "comment": "hosts.set_host failed to change 127.0.1.1"459        + ' from "foo.bar" to "foo.bar foo"',460    }461    mock = MagicMock(return_value=["foo.bar"])462    with patch.dict(host.__salt__, {"hosts.get_alias": mock}):463        mock = MagicMock(return_value=False)464        with patch.dict(host.__salt__, {"hosts.set_host": mock}):465            with patch.dict(host.__opts__, {"test": False}):466                assert expected == host.only("127.0.1.1", ["foo.bar", "foo"])467def test_only_success():468    """469    Test only() when state successfully changes470    """471    expected = {472        "name": "127.0.1.1",473        "changes": {"127.0.1.1": {"old": "foo.bar", "new": "foo.bar foo"}},474        "result": True,475        "comment": "successfully changed 127.0.1.1"476        + ' from "foo.bar" to "foo.bar foo"',477    }478    mock = MagicMock(return_value=["foo.bar"])479    with patch.dict(host.__salt__, {"hosts.get_alias": mock}):480        mock = MagicMock(return_value=True)481        with patch.dict(host.__salt__, {"hosts.set_host": mock}):482            with patch.dict(host.__opts__, {"test": False}):...nmap.py
Source:nmap.py  
1#!/usr/bin/env python32# -*- coding: utf-8 -*-3# imports4from core.database import *5from libnmap.parser import NmapParser6import re7class karma_ext():8	name     = "nmap importer"9	10	def match(self, head_str):11		""" match string in order to identify nmap xml report """12		match_str = """<nmaprun scanner="nmap" args="nmap"""13		if match_str in head_str.lower():14			return True15		return False16	def parse(self, xml, database):17		""" import an nmap xml output """18		report = NmapParser.parse_fromfile(xml)19		for host in report.hosts:20			# get os accuracy21			try:22				accuracy = str(host.os_class_probabilities()[0])23			except:24				accuracy = ""25			# get the os match26			try:27				match = str(host.os_match_probabilities()[0])28			except:29				match = ""30			# get the first hostname31			try:32				hostname = host.hostnames[0]33			except:34				hostname = ""35			# check if the host is already in the db36			if database.host_exist(host.address):37				# update38				add_host = database.session.query(targets).filter( targets.address == host.address ).one()39				40				# update values only if there's more informations41				if len(str(host.scripts_results)) > 3:42					add_host.scripts = str(host.scripts_results)43				if len(hostname) > 0:44					if not hostname in add_host.hostname:45						# add multiple hostnames46						add_host.hostname = add_host.hostname + " " + hostname47				if len(match) > 0:48					add_host.os_match = match49				if len(accuracy) >0:50					add_host.os_accuracy = accuracy51				if len(host.ipv4) > 0:52					add_host.ipv4 = host.ipv453				if len(host.ipv6) > 0:54					add_host.ipv6 = host.ipv655				if len(host.mac) > 0:56					add_host.mac = host.mac57				if len(host.status) > 0:58					add_host.status = host.status59				if len(host.tcpsequence) > 0:60					add_host.tcpsequence = host.tcpsequence61				if len(host.vendor) > 0:62					add_host.vendor = host.vendor63				if len(str(host.uptime)) > 0:64					add_host.uptime = host.uptime65				if len(str(host.lastboot)) > 0:66					add_host.lastboot = host.lastboot67				if len(str(host.distance)) > 0:68					add_host.distance = host.distance69			else:70				# add the host to the db71				add_host = targets(address=host.address,scripts=str(host.scripts_results), hostname=hostname, os_match=match, os_accuracy=accuracy, ipv4=host.ipv4, ipv6=host.ipv6, mac=host.mac, status=host.status, tcpsequence=host.tcpsequence, vendor=host.vendor, uptime=host.uptime, lastboot=host.lastboot, distance=host.distance)72			73			# commit to db74			database.session.add(add_host)75			database.session.commit()76			for port in host.get_ports():77				service = host.get_service(port[0],port[1])78				if database.port_exist(add_host.id, port[0], port[1]):79					# update the existing port80					add_port = database.session.query(services).filter( services.host_id == add_host.id, services.port == port[0], services.protocol == port[1] ).one()81					if len(service.service) > 0:82						add_port.service = service.service83					if len(service.servicefp) > 0:84						add_port.fingerprint = str(service.servicefp)85					#print(service.servicefp)86					if len(service.state) > 0:87						add_port.state = service.state88					if len(service.banner) > 0:89						#print(service.banner)90						nb = re.sub(r'[A-z]+?:\s','', service.banner)91						add_port.banner = nb92				else:93					# add the new port94					add_port = services(port=port[0], protocol=port[1], service=service.service, fingerprint=service.servicefp, state=service.state, banner=service.banner, host = add_host)95				# commit to db96				database.session.add(add_port)...shodan.py
Source:shodan.py  
1#!/usr/bin/env python32# -*- coding: utf-8 -*-3# imports4from core.database import *5import json6class karma_ext():7	name     = "shodan.io importer"8	9	def match(self, head_str):10		""" match string in order to identify nmap xml report """11		if ", \"postal_code\": " in head_str:12			return True13		return False14	def parse(self, json_file, database):15		""" import smap.py json output """16		file = open(json_file,'r')17		sp_out = file.read()18		file.close()19		smap_out = json.loads(sp_out)20		for host in smap_out:21			# get the os match22			if smap_out[host]["os"]:23				match = smap_out[host]["os"]24			else:25				match = ""26			# get the first hostname27			try:28				hostname = smap_out[host]["hostnames"][0]29			except:30				hostname = ""31			# check if the host is already in the db32			if database.host_exist(host):33				# update34				add_host = database.session.query(targets).filter( targets.address == host ).one()35				36				# update values only if there's more informations37				if len(hostname) > 0:38					if not hostname in add_host.hostname:39						# add multiple hostnames40						add_host.hostname = add_host.hostname + " " + hostname41				if len(match) > 0:42					add_host.os_match = match43				if len(add_host.status) > 0:44					add_host.status = "up"45				add_host.isp = smap_out[host]["isp"]46				add_host.country_name =  smap_out[host]["country_name"]47				add_host.country_code = smap_out[host]["country_code"]48				add_host.organization = smap_out[host]["org"]49				add_host.latitude = smap_out[host]["latitude"]50				add_host.longitude = smap_out[host]["longitude"]51			else:52				# add the host to the db53				add_host = targets(address=host, latitude=smap_out[host]["latitude"],longitude= smap_out[host]["longitude"],hostname=hostname, os_match=match, status="up", country_code = smap_out[host]["country_code"], country_name = smap_out[host]["country_name"], organization = smap_out[host]["org"], isp = smap_out[host]["isp"])54			55			# commit to db56			database.session.add(add_host)57			database.session.commit()58			i = 059			for port in smap_out[host]["ports"]:60				service = database._find_nmap_service(port,smap_out[host]["data"][i]["transport"])61				if database.port_exist(add_host.id, port, smap_out[host]["data"][i]["transport"]):62					# update the existing port63					add_port = database.session.query(services).filter( services.host_id == add_host.id, services.port == port, services.protocol == smap_out[host]["data"][i]["transport"] ).one()64					if len(service) > 0:65						add_port.service = service66				else:67					# add the new port68					add_port = services(port=port, protocol=smap_out[host]["data"][i]["transport"], service=service, fingerprint="", state="open", banner="", host = add_host)69				# commit to db70				database.session.add(add_port)71				i += 1...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
