Best Python code snippet using localstack_python
_ssh_server.py
Source:_ssh_server.py  
1from ipaddress import ip_address2import logging3from re import Pattern4import random5import datetime6from typing import Optional, Set, Tuple7from paramiko.common import (AUTH_FAILED, AUTH_SUCCESSFUL, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED,8                             OPEN_SUCCEEDED)9import paramiko10from paramiko.channel import Channel11from frontend.honeylogger import SSHSession12from frontend.config import config13from ._proxy_handler import ProxyHandler14logger = logging.getLogger(__name__)15class Server(paramiko.ServerInterface):16    """Server implements the ServerInterface from paramiko17    :param paramiko: The SSH server interface18    """19    # The set of channels that have successfully issued a shell or exec request20    _channels_done: Set[int]21    def __init__(22            self,23            transport: paramiko.Transport,24            session: SSHSession,25            proxy_handler: ProxyHandler,26            usernames: Optional[Pattern],27            passwords: Optional[Pattern]) -> None:28        super().__init__()29        self._transport = transport30        self._usernames = usernames31        self._passwords = passwords32        self._session = session33        self._proxy_handler = proxy_handler34        self._channels_done = set()35        self._last_activity = datetime.datetime.now()36        self._logging_session_started = False37    def get_last_activity(self) -> datetime.datetime:38        """Get the datetime of the last activity39        :return: datetime of the last activity40        """41        return self._last_activity42    def _update_last_activity(self) -> None:43        """Updates the last activity seen"""44        self._last_activity = datetime.datetime.now()45        # Make sure to start the logging session if it isn't started46        if not self._logging_session_started:47            try:48                self._session.set_remote_version(self._transport.remote_version)49                self._session.begin()50                self._logging_session_started = True51            except Exception as exc:52                logger.exception("Failed to start the SSH logging session", exc_info=exc)53                raise54    def check_auth_password(self, username: str, password: str) -> int:55        self._update_last_activity()56        self._session.log_login_attempt(username, password)57        # Verify the login attempt against our username and password regex58        if self._usernames is not None and not self._usernames.match(username):59            return AUTH_FAILED60        if self._passwords is not None and not self._passwords.match(password):61            return AUTH_FAILED62        # Check if we have the LOGIN_SUCCESS_RATE set and apply it if we do63        if config.SSH_LOGIN_SUCCESS_RATE != -1:64            if random.randint(1, 100) > config.SSH_LOGIN_SUCCESS_RATE:65                return AUTH_FAILED66        self._proxy_handler.set_attacker_credentials(username, password)67        return AUTH_SUCCESSFUL68    def check_auth_publickey(self, username: str, key: paramiko.PKey) -> int:69        self._update_last_activity()70        return AUTH_FAILED71    def get_allowed_auths(self, username: str) -> str:72        self._update_last_activity()73        return 'password'74    def check_channel_request(self, kind: str, chanid: int) -> int:75        self._update_last_activity()76        logger.info("%s Channel request (id: %s, kind: %s)",77                    self._session, chanid, kind)78        if kind == "session":79            return (self._proxy_handler.create_backend_connection()80                    and self._proxy_handler.open_channel(kind, chanid))81        return OPEN_SUCCEEDED82    def check_channel_shell_request(self, channel: paramiko.Channel) -> bool:83        self._update_last_activity()84        logger.info("%s Shell request for channel %s",85                    self._session, channel.chanid)86        if channel.chanid in self._channels_done or not self._proxy_handler.handle_shell_request(87                channel):88            return False89        self._channels_done.add(channel.chanid)90        return True91    def check_channel_exec_request(self, channel: paramiko.Channel, command: bytes) -> bool:92        self._update_last_activity()93        if channel.chanid in self._channels_done or not self._proxy_handler.handle_exec_request(94                channel, command):95            return False96        self._channels_done.add(channel.chanid)97        return True98    def check_channel_pty_request(self, channel: paramiko.Channel, term: bytes,99                                  width: int, height: int, pixelwidth: int,100                                  pixelheight: int, _: bytes) -> bool:101        self._update_last_activity()102        logger.info("%s Pty request on channel %s",103                    self._session, channel.chanid)104        try:105            term_string = term.decode("utf-8")106            self._session.log_pty_request(term_string, width, height, pixelwidth, pixelheight)107        except UnicodeError:108            logger.exception("%s Pty request failed to decode the term %s to utf8",109                             self._session, term)110            return False111        return self._proxy_handler.handle_pty_request(112            channel, term_string, width, height, pixelwidth, pixelheight)113    def check_channel_window_change_request(self, channel: Channel, width: int, height: int,114                                            pixelwidth: int, pixelheight: int) -> bool:115        self._update_last_activity()116        return self._proxy_handler.handle_window_change_request(117            channel, width, height, pixelwidth, pixelheight)118    def check_channel_env_request(self, channel: Channel, name: bytes, value: bytes) -> bool:119        self._update_last_activity()120        try:121            name_string = name.decode("utf-8")122            value_string = value.decode("utf-8")123        except UnicodeDecodeError:124            logger.error(125                "%s Env request failed to decode the values (name: %s, value: %s)",126                self._session, name, value)127            return False128        self._session.log_env_request(channel.chanid, name_string, value_string)129        return False130    def check_channel_direct_tcpip_request(131            self, chanid: int, origin: Tuple[str, int],132            destination: Tuple[str, int]) -> int:133        self._update_last_activity()134        try:135            ip = ip_address(origin[0])136        except ValueError:137            logger.error("%s Direct TCPIP request failed to decode the origin IP %s",138                         self._session, origin[0])139            return OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED140        self._session.log_direct_tcpip_request(141            chanid, ip, origin[1],142            destination[0],143            destination[1])144        return OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED145    def check_channel_x11_request(146            self, channel: Channel, single_connection: bool, auth_protocol: str, auth_cookie: bytes,147            screen_number: int) -> bool:148        self._update_last_activity()149        self._session.log_x11_request(channel.chanid, single_connection,150                                      auth_protocol, memoryview(auth_cookie), screen_number)151        return False152    def check_channel_forward_agent_request(self, channel: Channel) -> bool:153        self._update_last_activity()154        logger.info("%s Forward agent request on channel %s",155                    self._session, channel.chanid)156        return False157    def check_port_forward_request(self, address: str, port: int) -> int:158        self._update_last_activity()159        self._session.log_port_forward_request(address, port)...test_ssh_server.py
Source:test_ssh_server.py  
1import datetime2import re3import logging4import pytest5from ipaddress import ip_address6from unittest.mock import MagicMock, create_autospec7import paramiko8from paramiko.common import (AUTH_FAILED, AUTH_SUCCESSFUL, )9from frontend.protocols.ssh._proxy_handler import ProxyHandler10from frontend.honeylogger import SSHSession11from frontend.honeylogger._console import ConsoleLogSSHSession12from frontend.protocols.ssh._ssh_server import Server13import frontend.protocols.ssh._ssh_server14from frontend.target_systems import TargetSystemProvider15debug_log = logging.getLogger(frontend.protocols.ssh._ssh_server.__name__)16def time_in_range(start, end, x):17    if start <= end:18        return start <= x <= end19    else:20        return start <= x or x <= end21@pytest.fixture()22def logger() -> SSHSession:23    return ConsoleLogSSHSession(ip_address("1.1.1.1"), 1, ip_address("2.2.2.2"), 2)24@pytest.fixture()25def target_system_provider(logger) -> ProxyHandler:26    return create_autospec(TargetSystemProvider)27@pytest.fixture()28def proxy_handler(logger, target_system_provider) -> ProxyHandler:29    return ProxyHandler(logger, target_system_provider)30@pytest.fixture()31def ssh_server(logger, proxy_handler) -> Server:32    transport_mock = MagicMock()33    transport_mock.remote_version = "ExampleVersion"34    return Server(transport_mock, logger, proxy_handler, re.compile(""), re.compile(""))35def test_update_last_activity_without_started_session(ssh_server: Server, logger: SSHSession):36    logger.begin = MagicMock()37    now = datetime.datetime.now()38    ssh_server._update_last_activity()39    logger.begin.assert_called_once()40    assert ssh_server._logging_session_started41    assert time_in_range(now,  datetime.datetime.now(), ssh_server._last_activity)42def test_update_last_activity_exception(ssh_server: Server, logger: SSHSession):43    logger.begin = MagicMock(side_effect=Exception)44    now = datetime.datetime.now()45    with pytest.raises(Exception):46        ssh_server._update_last_activity()47    logger.begin.assert_called_once()48    assert ssh_server._logging_session_started == False49    assert time_in_range(now,  datetime.datetime.now(), ssh_server._last_activity)50def test_get_last_activity(ssh_server: Server):51    time = datetime.datetime.now()52    ssh_server._last_activity = time53    assert ssh_server.get_last_activity() == time54def test_check_username_password(ssh_server: Server):55    ssh_server._usernames = re.compile(r"linus")56    ssh_server._passwords = re.compile(r"torvalds")57    now = datetime.datetime.now()58    assert ssh_server.check_auth_password("linus", "torvalds") == AUTH_SUCCESSFUL59    assert ssh_server.check_auth_password("inus", "torvalds") == AUTH_FAILED60    assert ssh_server.check_auth_password("linus", "lol") == AUTH_FAILED61    assert ssh_server.check_auth_password("inus", "") == AUTH_FAILED62    assert ssh_server.check_auth_password("", "") == AUTH_FAILED63    assert ssh_server.check_auth_password("Linus", "") == AUTH_FAILED64    ssh_server._usernames = None65    assert ssh_server.check_auth_password("", "torvalds") == AUTH_SUCCESSFUL66    assert ssh_server.check_auth_password("TSRA%$#B$WQ", "torvalds") == AUTH_SUCCESSFUL67    ssh_server._usernames = None68    ssh_server._passwords = None69    assert ssh_server.check_auth_password("TSRA%$#B$WQ", "") == AUTH_SUCCESSFUL70    assert ssh_server.check_auth_password("", "") == AUTH_SUCCESSFUL71    assert time_in_range(now,  datetime.datetime.now(), ssh_server._last_activity)72def test_check_auth_publickey(ssh_server: Server):73    now = datetime.datetime.now()74    key = paramiko.RSAKey(filename="./host.key")75    assert ssh_server.check_auth_publickey("", key) == AUTH_FAILED76    assert time_in_range(now,  datetime.datetime.now(), ssh_server._last_activity)77def test_check_channel_request_and_exec_once_per_channel(ssh_server: Server):78    now = datetime.datetime.now()79    cmd = b"ls -la"80    ssh_server._channels_done = set([1, 2])81    ssh_server._proxy_handler.handle_exec_request = MagicMock(return_value=True)82    assert ssh_server.check_channel_exec_request(paramiko.Channel(1), cmd) == False83    assert ssh_server.check_channel_shell_request(paramiko.Channel(2)) == False84    channel = paramiko.Channel(3)85    assert ssh_server.check_channel_exec_request(channel, cmd) == True86    assert ssh_server.check_channel_shell_request(channel) == False87    ssh_server._proxy_handler.handle_exec_request.assert_called_once_with(channel, cmd)88    ssh_server._proxy_handler.handle_exec_request.assert_called_once()89    assert time_in_range(now,  datetime.datetime.now(), ssh_server._last_activity)90def test_channel_pty_request(ssh_server: Server):91    now = datetime.datetime.now()92    ssh_server._session.log_pty_request = MagicMock()93    ssh_server._proxy_handler.handle_pty_request = MagicMock(return_value=True)94    channel = paramiko.Channel(3)95    term = b"screen"96    height = 43297    width = 3298    pixel_width = 43299    pixel_height = 432100    ssh_server.check_channel_pty_request(101        channel, term, width, height, pixel_width, pixel_height, b"")102    ssh_server._session.log_pty_request.assert_called_with(103        term.decode("utf-8"), width, height, pixel_width, pixel_height)104    ssh_server._proxy_handler.handle_pty_request.assert_called_once_with(105        channel, term.decode("utf-8"), width, height, pixel_width, pixel_height)...tracker.py
Source:tracker.py  
1import urllib.parse2import urllib.request3from typing import Dict, Sequence, Union4import btorrent.transport.tracker as transport5class Tracker:6    def __init__(self, tracker_addr: bytes, proxies: Dict[str, str] = None) -> None:7        self.tracker_addr = urllib.parse.urlparse(tracker_addr)8        self.proxies: Dict[str, str] = proxies or {}9        self._proxy_handler = urllib.request.ProxyHandler(self.proxies)10        self.interval = 011        self.last_announce_time = 012    def __hash__(self):13        return hash(self.tracker_addr)14    def __eq__(self, other: 'Tracker') -> bool:15        return self.tracker_addr == other.tracker_addr16    def set_proxies(self, proxies: Dict[str, str]) -> None:17        self.proxies = proxies18        self._proxy_handler.__init__(self.proxies)19    def _send_request(self, req: transport.Request) -> transport.Response:20        opener = transport.build_opener(self._proxy_handler)21        return opener.open(req.get_req())22    def announce(self,23                 info_hash: bytes,24                 peer_id: bytes,25                 left: int,26                 downloaded=0,27                 uploaded=0,28                 event=transport.AnnounceEvent.NONE,29                 key=-1,30                 numwant=-1,31                 ip=0,32                 port=6881,33                 udp_port=8881) -> Union[transport.Response, transport.AnnounceResponse]:34        if self.tracker_addr.scheme == b'udp':35            port = udp_port36        return self._send_request(transport.AnnounceRequest(37            self.tracker_addr, info_hash, peer_id,38            downloaded, left, uploaded, event,39            key, numwant, ip, port40        ))41    def scrape(self, info_hashes: Sequence[bytes] = ()) -> Union[transport.Response, transport.ScrapeResponse]:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
