Best Python code snippet using pyatom_python
proc.py
Source:proc.py  
1# coding: utf-82from logging import getLogger3from ptrlib.util.encoding import *4from ptrlib.pwn.tube import *5import errno6import select7import fcntl8import os9import subprocess10logger = getLogger(__name__)11class Process(Tube):12    def __init__(self, args, env=None, cwd=None, timeout=None):13        """Create a process14        Create a new process and make a pipe.15        Args:16            args (list): The arguments to pass17            env (list) : The environment variables18        Returns:19            Process: ``Process`` instance.20        """21        if isinstance(args, list):22            self.args = args23            self.filepath = args[0]24        else:25            self.args = [args]26            self.filepath = args27        self.env = env28        self.timeout = timeout29        self.temp_timeout = None30        self.reservoir = b''31        self.proc = None32        # Create a new process33        try:34            self.proc = subprocess.Popen(35                self.args,36                cwd = cwd,37                env = self.env,38                shell = False,39                stdout=subprocess.PIPE,40                stderr=subprocess.STDOUT,41                stdin=subprocess.PIPE42            )43        except FileNotFoundError:44            logger.warn("Executable not found: '{0}'".format(self.filepath))45            return46        # Set in non-blocking mode47        fd = self.proc.stdout.fileno()48        fl = fcntl.fcntl(fd, fcntl.F_GETFL)49        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)50        logger.info("Successfully created new process (PID={})".format(self.proc.pid))51    def _settimeout(self, timeout):52        if timeout is None:53            self.temp_timeout = self.timeout54        else:55            self.temp_timeout = timeout56    def _poll(self):57        if self.proc is None:58            return False59        self.proc.poll()60        returncode = self.proc.returncode61        if returncode is not None:62            logger.error(63                "Process '{}' stopped with exit code {} (PID={})".format(64                    self.filepath, returncode, self.proc.pid65                ))66            self.proc = None67        return returncode68    def _is_alive(self):69        return self._poll() is None70    def _can_recv(self):71        if self.proc is None:72            return False73        try:74            return select.select([self.proc.stdout], [], [], self.temp_timeout) == ([self.proc.stdout], [], [])75        except select.error as v:76            if v[0] == errno.EINTR:77                return False78    def recv(self, size=4096, timeout=None):79        """Receive raw data80        Receive raw data of maximum `size` bytes length through the pipe.81        Args:82            size    (int): The data size to receive83            timeout (int): Timeout (in second)84        Returns:85            bytes: The received data86        """87        self._settimeout(timeout)88        if size <= 0:89            logger.error("`size` must be larger than 0")90            return None91        self._poll()92        if size <= len(self.reservoir):93            # Use the buffer94            data = self.reservoir[:size]95            self.reservoir = self.reservoir[size:]96            return data97        if not self._can_recv():98            return b''99        try:100            data = self.proc.stdout.read()101            self.reservoir += data102        except subprocess.TimeoutExpired:103            logger.error("Timeout")104            return None105        if len(self.reservoir) == 0:106            # No data received107            data = None108        elif len(self.reservoir) >= size:109            # Too much data received110            data = self.reservoir[:size]111            self.reservoir = self.reservoir[size:]112        else:113            # Too little data received114            data = self.reservoir115            self.reservoir = b''116        return data117    def recvonce(self, size=4, timeout=None):118        """Receive raw data119        Receive raw data of `size` bytes length through the pipe.120        Args:121            size    (int): The data size to receive122            timeout (int): Timeout (in second)123        Returns:124            bytes: The received data125        Raises:126            SocketException: If the socket is broken.127        """128        self._settimeout(timeout)129        data = b''130        if size <= 0:131            logger.error("`size` must be larger than 0")132            return None133        read_byte = 0134        recv_size = size135        while read_byte < size:136            recv_data = self.recv(recv_size, timeout)137            if recv_data is None:138                return None139            elif recv_data == b'':140                logger.error("Received nothing")141                return None142            data += recv_data143            read_byte += len(data)144            recv_size = size - read_byte145        return data146    def send(self, data, timeout=None):147        """Send raw data148        Send raw data through the socket149        Args:150            data (bytes) : Data to send151            timeout (int): Timeout (in second)152        """153        self._settimeout(timeout)154        if isinstance(data, str):155            data = str2bytes(data)156        try:157            self.proc.stdin.write(data)158            self.proc.stdin.flush()159        except IOError:160            logger.warning("Broken pipe")161    def close(self):162        """Close the socket163        Close the socket.164        This method is called from the destructor.165        """166        if self.proc:167            self.proc.kill()168            logger.info("close: '{0}' killed".format(self.filepath))169    def __del__(self):...sock.py
Source:sock.py  
1# coding: utf-82from logging import getLogger3from ptrlib.util.encoding import *4from ptrlib.pwn.tube import *5import socket6logger = getLogger(__name__)7class Socket(Tube):8    def __init__(self, host, port, timeout=None):9        """Create a socket10        Create a new socket and establish a connection to the host.11        Args:12            host (str): The host name or ip address of the server13            port (int): The port number14        Returns:15            Socket: ``Socket`` instance.16        """17        self.host = host18        self.port = port19        self.timeout = timeout20        # Create a new socket21        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)22        # Establish a connection23        try:24            self.sock.connect((self.host, self.port))25            logger.info("Successfully connected to {0}:{1}".format(self.host, self.port))26        except ConnectionRefusedError as e:27            logger.warning("Connection to {0}:{1} refused".format(self.host, self.port))28    def _settimeout(self, timeout):29        if timeout is None:30            self.sock.settimeout(self.timeout)31        else:32            self.sock.settimeout(timeout)33    def recv(self, size=4096, timeout=None):34        """Receive raw data35        Receive raw data of maximum `size` bytes length through the socket.36        Args:37            size    (int): The data size to receive38            timeout (int): Timeout (in second)39        Returns:40            bytes: The received data41        """42        self._settimeout(timeout)43        if size <= 0:44            logger.error("`size` must be larger than 0")45            return None46        try:47            data = self.sock.recv(size)48        except socket.timeout:49            return None50        # No data received51        if len(data) == 0:52            data = None53        return data54    def recvonce(self, size=4, timeout=None):55        """Receive raw data at once56        Receive raw data of `size` bytes length through the socket.57        Args:58            size    (int): The data size to receive59            timeout (int): Timeout (in second)60        Returns:61            bytes: The received data62        """63        self._settimeout(timeout)64        data = b''65        if size <= 0:66            logger.error("`size` must be larger than 0")67            return None68        try:69            read_byte = 070            recv_size = size71            while read_byte < size:72                data += self.sock.recv(recv_size)73                read_byte = len(data)74                recv_size = size - read_byte75        except socket.timeout:76            logger.error("Timeout")77            return None78        return data79    def send(self, data, timeout=None):80        """Send raw data81        Send raw data through the socket82        Args:83            data (bytes) : Data to send84            timeout (int): Timeout (in second)85        """86        self._settimeout(timeout)87        if isinstance(data, str):88            data = str2bytes(data)89        try:90            self.sock.send(data)91        except BrokenPipeError:92            logger.warning("Broken pipe")93    def close(self):94        """Close the socket95        Close the socket.96        This method is called from the destructor.97        """98        self.sock.close()99        logger.info("Connection to {0}:{1} closed".format(self.host, self.port))100    def __del__(self):...o_tcp.py
Source:o_tcp.py  
1#!/usr/bin/env python32#/***************************************************************************//**3#@file			o_tcp.py4#								5#@author		Black-Blade 6#@brief			o_tcp.py 7#@date    		13.01.20218#@version		0.0.1 Doxygen style eingebaut und erstellen dieser File9#@see           https://tools.ietf.org/html/rfc103510#*******************************************************************************/11import socket12# IMPORT MY STANDRT CLASS13from log import logging14from config import Config15if __name__ == "__main__":16    quit()17class OUTPUT_TCP:18#/*******************************************************************************19# @author       Black-Blade20# @brief        Constructor of OUTPUT_TCP21# @date         10.03.202122# @param        [tcpserver(String(),port(int),timeout(fload)]23# @return       24# @version      0.0.1 Doxygen style eingebaut und erstellen dieser File25# @see          26# *******************************************************************************/27    def __init__(self,tcpserver=None):28        logging.debug ("")29        if tcpserver is None:30            self._ip=Config.O_TCPSERVER31            self._port=Config.O_TCPPORT32            self._settimeout =Config.O_TCPTIMEOUT33        else:34            ip,port,timeout = tcpserver35            self._ip=ip36            self._port=port37            self._settimeout =timeout     38        self._buffersize =102439       40#/*******************************************************************************41# @author       Black-Blade42# @brief        Deconstructor of OUTPUT_TCP43# @date         06.03.202144# @param        45# @return       46# @version      0.0.1 Doxygen style eingebaut und erstellen dieser File47# @see          48# *******************************************************************************/49    def __del__(self): 50        logging.debug ("")51#/*******************************************************************************52# @author       Black-Blade53# @brief        Send data to extern DNS TCP server54# @date         06.03.202155# @param        [txdata (data to dns server)]56# @return       [rxdata (data from dns server)]57# @version      0.0.1 Doxygen style eingebaut und erstellen dieser File58##@see          https://tools.ietf.org/html/rfc1035          59# *******************************************************************************/60    def send(self,txdata):61        logging.debug ("")62        rxdata =None63        64        try:65            #Add 2 byte for len for tcp protokll66            txdata= (len(txdata)).to_bytes(2, byteorder="big") + txdata67    68            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)69            s.settimeout(self._settimeout)70    71            s.connect((self._ip, self._port))72            logging.debug ("TCP output send to :"+str( self._ip)+":"+str( self._port))73            s.send(txdata)74            rxdata = s.recv(self._buffersize) 75            logging.debug ("TCP incomming fom :"+str( self._ip)+":"+str( self._port))76       77            s.close()78            # clear the first 2 byte for datalen79            rxdata =rxdata[2:]80        81        except OSError as err:82            logging.error("OS error: {0}".format(err))83        ...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
