Best Python code snippet using autotest_python
test_roslaunch_pmon.py
Source:test_roslaunch_pmon.py  
...65        pass66        67    def unregister(self, p):68        self.procs.remove(p)69    def has_process(self, name):70        return len([p for p in self.procs if p.name == name]) > 071    def get_process(self, name):72        return self.procs.get(p, None)73    def has_main_thread_jobs(self):74        return False75    76    def do_main_thread_jobs(self):77        pass78    79    def kill_process(self, name):80        pass81        82    def shutdown(self):83        pass84        85    def get_active_names(self):86        return [p.name for p in self.procs]87    def get_process_names_with_spawn_count(self):88        actives = [(p.name, p.spawn_count) for p in self.procs]89        deads = []90        retval = [actives, deads]91    def mainthread_spin_once(self):92        pass93        94    def mainthread_spin(self):95        pass96    def run(self):97        pass98            99class ProcessMock(roslaunch.pmon.Process):100    def __init__(self, package, name, args, env, respawn=False):101        super(ProcessMock, self).__init__(package, name, args, env, respawn)102        self.stopped = False103    def stop(self, errors):104        self.stopped = True105        106class RespawnOnceProcessMock(ProcessMock):107    def __init__(self, package, name, args, env, respawn=False):108        super(ProcessMock, self).__init__(package, name, args, env, respawn)109        self.spawn_count = 0110    def is_alive(self):111        return False112    113    def start(self):114        self.spawn_count += 1115        if self.spawn_count > 1:116            self.respawn = False117## Test roslaunch.server118class TestRoslaunchPmon(unittest.TestCase):119    def setUp(self):120        self.pmon = roslaunch.pmon.ProcessMonitor()121    ## test all apis of Process instance. part coverage/sanity test122    def _test_Process(self, p, package, name, args, env, respawn):123        self.assertEquals(package, p.package)124        self.assertEquals(name, p.name)125        self.assertEquals(args, p.args)        126        self.assertEquals(env, p.env)  127        self.assertEquals(respawn, p.respawn)128        self.assertEquals(0, p.spawn_count)        129        self.assertEquals(None, p.exit_code)130        self.assert_(p.get_exit_description())131        self.failIf(p.is_alive())132        info = p.get_info()133        self.assertEquals(package, info['package'])134        self.assertEquals(name, info['name'])135        self.assertEquals(args, info['args'])        136        self.assertEquals(env, info['env'])  137        self.assertEquals(respawn, info['respawn'])138        self.assertEquals(0, info['spawn_count'])        139        self.failIf('exit_code' in info)140        p.start()141        self.assertEquals(1, p.spawn_count)142        self.assertEquals(1, p.get_info()['spawn_count']) 143        p.start()        144        self.assertEquals(2, p.spawn_count)145        self.assertEquals(2, p.get_info()['spawn_count'])146        # noop147        p.stop()148        p.exit_code = 0149        self.assertEquals(0, p.get_info()['exit_code'])150        self.assert_('cleanly' in p.get_exit_description())151        p.exit_code = 1152        self.assertEquals(1, p.get_info()['exit_code'])                        153        self.assert_('[exit code 1]' in p.get_exit_description())154    ## tests to make sure that our Process base class has 100% coverage155    def test_Process(self):156        from roslaunch.pmon import Process157        # test constructor params158        respawn = False159        package = 'foo-%s'%time.time()160        name = 'name-%s'%time.time()161        args = [time.time(), time.time(), time.time()]162        env = { 'key': time.time(), 'key2': time.time() } 163        p = Process(package, name, args, env)164        self._test_Process(p, package, name, args, env, False)165        p = Process(package, name, args, env, True)166        self._test_Process(p, package, name, args, env, True) 167        p = Process(package, name, args, env, False)168        self._test_Process(p, package, name, args, env, False) 169        170    def _test_DeadProcess(self, p0, package, name, args, env, respawn):171        from roslaunch.pmon import DeadProcess172        p0.exit_code = -1173        dp = DeadProcess(p0)174        self.assertEquals(package, dp.package)175        self.assertEquals(name, dp.name)176        self.assertEquals(args, dp.args)        177        self.assertEquals(env, dp.env)  178        self.assertEquals(respawn, dp.respawn)179        self.assertEquals(0, dp.spawn_count)        180        self.assertEquals(-1, dp.exit_code)181        self.failIf(dp.is_alive())182        info = dp.get_info()183        info0 = p0.get_info()184        self.assertEquals(info0['package'], info['package'])185        self.assertEquals(info0['name'], info['name'])186        self.assertEquals(info0['args'], info['args'])        187        self.assertEquals(info0['env'], info['env'])  188        self.assertEquals(info0['respawn'], info['respawn'])189        self.assertEquals(0, info['spawn_count'])        190        try:191            dp.start()192            self.fail("should not be able to start a dead process")193        except: pass194            195        # info should be frozen196        p0.package = 'dead package'197        p0.name = 'dead name'        198        p0.spawn_count = 1199        self.assertEquals(package, dp.package)200        self.assertEquals(name, dp.name)201        self.assertEquals(0, dp.spawn_count)                202        self.assertEquals(package, dp.get_info()['package']) 203        self.assertEquals(name, dp.get_info()['name']) 204        self.assertEquals(0, dp.get_info()['spawn_count']) 205        p0.start()        206        self.assertEquals(0, dp.spawn_count)207        self.assertEquals(0, dp.get_info()['spawn_count'])208        # noop209        p0.stop()210    def test_DeadProcess(self):211        from roslaunch.pmon import Process, DeadProcess212        # test constructor params213        respawn = False214        package = 'foo-%s'%time.time()215        name = 'name-%s'%time.time()216        args = [time.time(), time.time(), time.time()]217        env = { 'key': time.time(), 'key2': time.time() } 218        p = Process(package, name, args, env)219        self._test_DeadProcess(p, package, name, args, env, False)220        p = Process(package, name, args, env, True)221        self._test_DeadProcess(p, package, name, args, env, True) 222        p = Process(package, name, args, env, False)223        self._test_DeadProcess(p, package, name, args, env, False) 224    def test_start_shutdown_process_monitor(self):225        def failer():226            raise Exception("oops")227        # noop228        self.failIf(roslaunch.pmon.shutdown_process_monitor(None))229        # test with fake pmon so we can get branch-complete230        pmon = ProcessMonitorMock()231        # - by setting alive to true, shutdown fails, though it can't really do anything about it232        pmon.alive = True233        self.failIf(roslaunch.pmon.shutdown_process_monitor(pmon))234        235        # make sure that exceptions get trapped        236        pmon.shutdown = failer237        # should cause an exception during execution, but should get caught 238        self.failIf(roslaunch.pmon.shutdown_process_monitor(pmon))239        240        # Test with a real process monitor241        pmon = roslaunch.pmon.start_process_monitor()242        self.assert_(pmon.isAlive())243        self.assert_(roslaunch.pmon.shutdown_process_monitor(pmon))244        self.failIf(pmon.isAlive())245        # fiddle around with some state that would shouldn't be246        roslaunch.pmon._shutting_down = True247        pmon = roslaunch.pmon.start_process_monitor()248        if pmon != None:249            self.failIf(roslaunch.pmon.shutdown_process_monitor(pmon))250            self.fail("start_process_monitor should fail if during shutdown sequence")251            252    def test_pmon_shutdown(self):253        # should be noop254        roslaunch.pmon.pmon_shutdown()255        256        # start two real process monitors and kill them257        # pmon_shutdown258        pmon1 = roslaunch.pmon.start_process_monitor()259        pmon2 = roslaunch.pmon.start_process_monitor()260        self.assert_(pmon1.isAlive())261        self.assert_(pmon2.isAlive())        262        roslaunch.pmon.pmon_shutdown()263        264        self.failIf(pmon1.isAlive())265        self.failIf(pmon2.isAlive())        266        267    def test_add_process_listener(self):268        # coverage test, not a functionality test as that would be much more difficult to simulate269        from roslaunch.pmon import ProcessListener270        l = ProcessListener()271        self.pmon.add_process_listener(l)272    def test_kill_process(self):273        from roslaunch.core import RLException274        pmon = self.pmon275        # should return False276        self.failIf(pmon.kill_process('foo'))277        278        p1 = ProcessMock('foo', 'name1', [], {})279        p2 = ProcessMock('bar', 'name2', [], {})280        pmon.register(p1)281        pmon.register(p2)        282        self.failIf(p1.stopped)283        self.failIf(p2.stopped)284        self.assert_(p1.name in pmon.get_active_names())285        self.assert_(p2.name in pmon.get_active_names())286        # should fail as pmon API is string-based287        try:288            self.assert_(pmon.kill_process(p1))289            self.fail("kill_process should have thrown RLException")290        except RLException: pass291        292        self.assert_(pmon.kill_process(p1.name))293        self.assert_(p1.stopped)294        295        # - pmon should not have removed yet as the pmon thread cannot catch the death296        self.assert_(pmon.has_process(p1.name))297        self.assert_(p1.name in pmon.get_active_names())298        299        self.failIf(p2.stopped)        300        self.assert_(p2.name in pmon.get_active_names())       301        pmon.kill_process(p2.name)302        self.assert_(p2.stopped)303        304        # - pmon should not have removed yet as the pmon thread cannot catch the death305        self.assert_(pmon.has_process(p2.name))306        self.assert_(p2.name in pmon.get_active_names())307        p3 = ProcessMock('bar', 'name3', [], {})308        def bad(x):309            raise Exception("ha ha ha")310        p3.stop = bad311        pmon.register(p3)312        # make sure kill_process is safe313        pmon.kill_process(p3.name)314        def f():315            return False316        p1.is_alive = f317        p2.is_alive = f        318        p3.is_alive = f319        # Now that we've 'killed' all the processes, we should be able320        # to run through the ProcessMonitor run loop and it should321        # exit.  But first, important that we check that pmon has no322        # other extra state in it323        self.assertEquals(3, len(pmon.get_active_names()))324        # put pmon into exitable state325        pmon.registrations_complete()326        # and run it -- but setup a safety timer to kill it if it doesn't exit327        marker = Marker()328        thread.start_new_thread(kill_pmon, (self.pmon,marker, 10.))329        330        pmon.run()331        332        self.failIf(marker.marked, "pmon had to be externally killed")333        334        self.assert_(pmon.done)335        self.failIf(pmon.has_process(p1.name))336        self.failIf(pmon.has_process(p2.name))337        alive, dead = pmon.get_process_names_with_spawn_count()338        self.failIf(alive)339        self.assert_((p1.name, p1.spawn_count) in dead)340        self.assert_((p2.name, p2.spawn_count) in dead)341    def test_run(self):342        # run is really hard to test...343        pmon = self.pmon344        # put pmon into exitable state345        pmon.registrations_complete()346        # give the pmon a process that raises an exception when it's347        # is_alive is checked. this should be marked as a dead process348        p1 = ProcessMock('bar', 'name1', [], {})349        def bad():350            raise Exception('ha ha')351        p1.is_alive = bad352        pmon.register(p1)353        354        # give pmon a well-behaved but dead process355        p2 = ProcessMock('bar', 'name2', [], {})        356        def f():357            return False358        p2.is_alive = f359        pmon.register(p2)360        # give pmon a process that wants to respawn once361        p3 = RespawnOnceProcessMock('bar', 'name3', [], {})        362        pmon.register(p3)363        364        # test assumptions about pmon's internal data structures365        # before we begin test366        self.assert_(p1 in pmon.procs)367        self.assert_(pmon._registrations_complete)368        self.failIf(pmon.is_shutdown)369        370        # and run it -- but setup a safety timer to kill it if it doesn't exit371        marker = Marker()372        thread.start_new_thread(kill_pmon, (self.pmon,marker, 10.))373        374        pmon.run()375        376        self.failIf(marker.marked, "pmon had to be externally killed")        377        # retest assumptions378        self.failIf(pmon.procs)379        self.assert_(pmon.is_shutdown)        380        pmon.is_shutdown = False381                382    def test_get_process_names_with_spawn_count(self):383        p1 = ProcessMock('foo', 'name1', [], {})384        p2 = ProcessMock('bar', 'name2', [], {})385        pmon = self.pmon386        self.assertEquals([[], []], pmon.get_process_names_with_spawn_count())387        pmon.register(p1)388        self.assertEquals([[('name1', 0),], []], pmon.get_process_names_with_spawn_count())        389        pmon.register(p2)390        alive, dead = pmon.get_process_names_with_spawn_count()391        self.assertEquals([], dead)392        self.assert_(('name1', 0) in alive)393        self.assert_(('name2', 0) in alive)        394        395        import random396        p1.spawn_count = random.randint(1, 10000)397        p2.spawn_count = random.randint(1, 10000)398        399        alive, dead = pmon.get_process_names_with_spawn_count()400        self.assertEquals([], dead)401        self.assert_((p1.name, p1.spawn_count) in alive)402        self.assert_((p2.name, p2.spawn_count) in alive)403        #TODO figure out how to test dead_list404        405    ## Tests ProcessMonitor.register(), unregister(), has_process(), and get_process()406    def test_registration(self):407        from roslaunch.core import RLException408        from roslaunch.pmon import Process409        pmon = self.pmon410        411        p1 = Process('foo', 'name1', [], {})412        p2 = Process('bar', 'name2', [], {})413        corep1 = Process('core', 'core1', [], {})        414        corep2 = Process('core', 'core2', [], {})        415        pmon.register(p1)416        self.assert_(pmon.has_process('name1'))417        self.assertEquals(p1, pmon.get_process('name1'))418        self.failIf(pmon.has_process('name2'))419        self.assertEquals(['name1'], pmon.get_active_names())420        try:421            pmon.register(Process('foo', p1.name, [], {}))422            self.fail("should not allow duplicate process name")423        except RLException: pass424        425        pmon.register(p2)426        self.assert_(pmon.has_process('name2'))427        self.assertEquals(p2, pmon.get_process('name2'))428        self.assertEquals(set(['name1', 'name2']), set(pmon.get_active_names()))429        430        pmon.register_core_proc(corep1)431        self.assert_(pmon.has_process('core1'))432        self.assertEquals(corep1, pmon.get_process('core1'))433        self.assertEquals(set(['name1', 'name2', 'core1']), set(pmon.get_active_names()))        434        pmon.register_core_proc(corep2)435        self.assert_(pmon.has_process('core2'))436        self.assertEquals(corep2, pmon.get_process('core2'))437        self.assertEquals(set(['name1', 'name2', 'core1', 'core2']), set(pmon.get_active_names()))                438        pmon.unregister(p2)439        self.failIf(pmon.has_process('name2'))                440        pmon.unregister(p1)441        self.failIf(pmon.has_process('name1'))        442        pmon.unregister(corep1)443        self.failIf(pmon.has_process('core1'))                444        pmon.unregister(corep2)445        self.failIf(pmon.has_process('core2'))446        pmon.shutdown()447        try:448            pmon.register(Process('shutdown_fail', 'shutdown_fail', [], {}))449            self.fail("registration should fail post-shutdown")450        except RLException: pass451                      452        453        454    def test_mainthread_spin_once(self):455        # shouldn't do anything456        self.pmon.done = False457        self.pmon.mainthread_spin_once()458        self.pmon.done = True        459        self.pmon.mainthread_spin_once()...HttpSendCamera.py
Source:HttpSendCamera.py  
1from concurrent.futures.thread import ThreadPoolExecutor2from IO.camera.basler.process.img.bangcai3.process_img import process_img3from IO.camera.basler.source.basler_camera import BaslerCameras4from threading import Thread5from queue import Queue6from tools.cv2base64 import image_to_base647from  tools.wrapper import SAFE8import time9import requests10import json11# import grequests12class HttpSender():13    def __init__(self,send_url,send_thread=5,sleep_ms=150):14        super(HttpSender, self).__init__()15        self.send_url = send_url16        self.queue = Queue()17        self.send_thread = send_thread18        self.threadpool = ThreadPoolExecutor(max_workers=self.send_thread)19        print("éåé¿åº¦{}".format(self.threadpool._work_queue.qsize()))20        self.sleep_ms = sleep_ms21    # def run(self):22    #     while True:23    #         self.loop_func()24    # @SAFE25    # def loop_func(self):26    #     if self.queue.qsize() > 0:27    #         c = self.queue.get()28    #         self.threadpool.submit(self.send_img,c[0],c[code])29    #30    #     else:31    #         time.sleep(self.sleep_ms/1000)32    # def put(self,img,info):33    #     self.queue.put((img,info))34    def async_send_img(self,img,info):35        self.threadpool.submit(self.send_img,image_to_base64(img),info)36        print("éåé¿åº¦{}".format(self.threadpool._work_queue.qsize()))37    @SAFE38    def send_img(self,img_base64,info):39        data = {"data": info, "img": img_base64}40        response = requests.get(self.send_url, json=json.dumps(data))41class HttpSendCamera(BaslerCameras):42    def __init__(self,begin_id,camera_ip_list,camera_config_list,send_url,send_thread):43        super(HttpSendCamera,self).__init__(begin_id,camera_ip_list,camera_config_list)44        self.calc_percent = 0.5  # æå¤å°æ¯å¨å·¥æ§æºä¸å计ç®45        self.sender = HttpSender(send_url=send_url,send_thread=send_thread)46    def handle_img(self,image,camera_id,count):47        super(HttpSendCamera, self).handle_img(image, camera_id, count)48        has_process = False49        has_process, has_steel, image, left, right = self.qiebian_mod(camera_id, count, has_process, image)50        info = dict(51            camera_id=camera_id + self.begin_id,52            edge_gap=0,53            count=count,54            day_info=self.day_info,55            trigger_info=self.trigger_info,56            has_steel=has_steel,57            left=left,58            right=right,59            has_process=has_process60        )61        self.send(image,info)62    def qiebian_mod(self, camera_id, count, has_process, image):63        if count % 10 < self.calc_percent * 10:64            image, has_steel, left, right = process_img(image, camera_id)65            has_process = True66        else:67            image, has_steel, left, right = image, True, 0, 102468        return has_process, has_steel, image, left, right69    def send(self,image,info):70        print("info",info)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
