Best Python code snippet using slash
baseclasses.py
Source:baseclasses.py  
1# -*- coding: utf-8 -*-2# -- stdlib --3from collections import defaultdict4from functools import partial5from time import time6import logging7# -- third party --8from pyglet.gl import GL_BACK, GL_BLEND, GL_FILL, GL_FRONT, GL_LINE, GL_MODELVIEW9from pyglet.gl import GL_ONE_MINUS_SRC_ALPHA, GL_PROJECTION, GL_SRC_ALPHA, glBlendFunc, glClearColor10from pyglet.gl import glEnable, glLoadIdentity, glMatrixMode, glOrtho, glPolygonMode, glPopMatrix11from pyglet.gl import glPushMatrix, glTranslatef, glViewport12import pyglet13# -- own --14from utils import hook15# -- code --16WINDOW_WIDTH = 102417WINDOW_HEIGHT = 70018main_window = None19sched_queue = []20log = logging.getLogger('UI_Baseclasses')21class Control(pyglet.event.EventDispatcher):22    def __init__(self, x=0, y=0, width=0, height=0,23                 zindex=0, parent=None, can_focus=False, manual_draw=False,24                 *args, **kwargs):25        self.__dict__.update({26            'parent': parent,27            'x': x, 'y': y,28            '_w': width, '_h': height,29            'zindex': zindex,30            'can_focus': can_focus,31            'manual_draw': manual_draw,32        })33        self.__dict__.update(kwargs)34        self.control_list = []35        # control under cursor now, for tracking enter/leave events36        self._control_hit = None37        if parent:38            parent.add_control(self)39        else:40            self.parent = parent41            self.overlay = False42    def _set_w(self, v):43        self._w = v44        self.dispatch_event('on_resize', self._w, self._h)45    def _get_w(self):46        return self._w47    width = property(_get_w, _set_w)48    def _set_h(self, v):49        self._h = v50        self.dispatch_event('on_resize', self._w, self._h)51    def _get_h(self):52        return self._h53    height = property(_get_h, _set_h)54    def add_control(self, c):55        self.control_list.append(c)56        c.parent = self57        c.overlay = self if isinstance(self, Overlay) else self.overlay58    def remove_control(self, c):59        self.control_list.remove(c)60        c.parent = None61        c.overlay = None62    def delete(self):63        # cleanup captures64        if self.overlay:65            for l in self.overlay._capture_events.values():66                if self in l:67                    l.remove(self)68        for c in self.control_list[:]:69            c.delete()70        assert not self.control_list71        if self.parent:72            self.parent.remove_control(self)73    def controls_frompoint(self, x, y):74        l = []75        for c in self.control_list:76            if c.x <= x <= c.x + c.width and c.y <= y <= c.y + c.height:77                if c.hit_test(x-c.x, y-c.y):78                    l.append(c)79        return l80    def control_frompoint1(self, x, y):81        l = self.controls_frompoint(x, y)82        # l.sort(key=lambda c: c.zindex, reverse=True)83        l.sort(key=lambda c: c.zindex)84        while l:85            c = l[-1]86            rst = c.hit_test(x-c.x, y-c.y)87            if rst:88                return c89            else:90                del l[-1]91                continue92        else:93            return None94    def control_frompoint1_recursive(self, x, y):95        c = self96        while True:97            c1 = c.control_frompoint1(x, y)98            if not c1: return c99            x -= c1.x100            y -= c1.y101            c = c1102    def hit_test(self, x, y):103        return True104    @staticmethod105    def batch_draw(l):106        glPushMatrix()107        for c in l:108            glLoadIdentity()109            x, y = c.abs_coords()110            glTranslatef(x, y, 0)111            c.draw()112        glPopMatrix()113    def draw(self):114        # default behaviors115        self.draw_subcontrols()116    @staticmethod117    def do_draw(cl):118        cl.sort(key=lambda c: (c.zindex, c.batch_draw))119        cl = [c for c in cl if not c.manual_draw]120        if not cl: return121        f = cl[0].batch_draw122        commit = []123        for c in cl:124            if c.batch_draw == f:125                commit.append(c)126            else:127                f(commit)128                commit = [c]129                f = c.batch_draw130        if commit: f(commit)131    def draw_subcontrols(self):132        self.do_draw(self.control_list)133    def set_focus(self):134        if not self.can_focus: return135        o = self.parent136        while not isinstance(o, Overlay):137            o = o.parent138        if o:139            if o.current_focus != self:140                if o.current_focus:141                    o.current_focus.dispatch_event('on_lostfocus')142                self.dispatch_event('on_focus')143                o.current_focus = self144    def set_capture(self, *evts):145        o = self.overlay146        for e in evts:147            o._capture_events.setdefault(e, []).append(self)148    def release_capture(self, *evts):149        o = self.overlay150        if not o:151            return152        for e in evts:153            l = o._capture_events.get(e)154            if l:155                l.remove(self)156    def abs_coords(self):157        c, ax, ay = self, 0.0, 0.0158        while c and not isinstance(c, Overlay):159            ax += c.x160            ay += c.y161            c = c.parent162        return (ax, ay)163    def migrate_to(self, new_parent):164        if self in new_parent.control_list:165            return166        ax, ay = self.abs_coords()167        npax, npay = new_parent.abs_coords()168        self.delete()169        new_parent.add_control(self)170        self.x, self.y = ax-npax, ay-npay171    def on_message(self, *args):172        '''Do nothing'''173        pass174    xy = property(lambda self: (self.x, self.y))175class Overlay(Control):176    '''177    Represents current screen178    FIXME: Should be called Scene179    '''180    class DummyOverlay(object):181        def dispatch_event(*args):182            pass183    cur_overlay = DummyOverlay()184    def __init__(self, *args, **kwargs):185        Control.__init__(186            self, width=WINDOW_WIDTH, height=WINDOW_HEIGHT, parent=False187        )188        self.__dict__.update(kwargs)189        self.last_mouse_press = [  # WONTFIX: Button combinations not supported.190            None,191            (0.0, None, 0.0, 0.0),  # (time(), self._control_hit, x, y) LEFT192            (0.0, None, 0.0, 0.0),  # MIDDLE193            None,                   # Not used194            (0.0, None, 0.0, 0.0),  # RIGHT195        ]196        self.last_mouse_release = [197            None,198            (0.0, None, 0.0, 0.0),  # (time(), self._control_hit, x, y) LEFT199            (0.0, None, 0.0, 0.0),  # MIDDLE200            None,                   # Not used201            (0.0, None, 0.0, 0.0),  # RIGHT202        ]203        self.current_focus = None204        self._capture_events = {}205        self.key_state = defaultdict(bool)206    def draw(self):207        main_window.clear()208        self.draw_subcontrols()209    def switch(self):210        ori = Overlay.cur_overlay211        ori.dispatch_event('on_switchout')212        Overlay.cur_overlay = self213        main_window.set_handlers(self)214        self.dispatch_event('on_switch')215        # HACK216        import gc217        gc.collect()218        # -----219        return ori220    def on_resize(self, width, height):221        from options import options222        z = options.zoom223        glViewport(0, 0, int(width * z), int(height * z))224        glMatrixMode(GL_PROJECTION)225        glLoadIdentity()226        glOrtho(0, width, 0, height, -1000, 1000)227        glMatrixMode(GL_MODELVIEW)228        return pyglet.event.EVENT_HANDLED229    def _position_events(self, _type, x, y, *args):230        cap_list = self._capture_events.setdefault(_type, [])[:]231        from options import options232        z = options.zoom233        x, y = x / z, y / z234        def dispatch(this, lx, ly):235            # Top most control get event236            c = this.control_frompoint1(lx, ly)237            lc = this._control_hit238            if c != lc:239                while lc:240                    lc.dispatch_event('on_mouse_leave', lx - lc.x, ly - lc.y)241                    lc._control_hit, lc = None, lc._control_hit242                if c:243                    c.dispatch_event('on_mouse_enter', lx - c.x, ly - c.y)244            this._control_hit = c245            if not c:246                # Now 'this' has no subcontrols hit, so 'this' got this event247                # TODO: if 'this' don't handle it, its parent should handle.248                if this is not self:249                    if _type == 'on_mouse_press':250                        this.set_focus()251            else:252                if dispatch(c, lx - c.x, ly - c.y):  # TODO: not recursive253                    return True254                if c not in cap_list:  # do not redispatch the same event255                    return c.dispatch_event(_type, lx - c.x, ly - c.y, *args)256        # capturing events257        for con in cap_list:258            ax, ay = con.abs_coords()259            if con.dispatch_event(_type, x-ax, y-ay, *args):260                return True261        return dispatch(self, x, y)262    def on_mouse_press(self, x, y, button, modifier):263        self.last_mouse_press[button] = (time(), self._control_hit, x, y)264        self._position_events('on_mouse_press', x, y, button, modifier)265    def on_mouse_release(self, x, y, button, modifier):266        lp = self.last_mouse_press[button]267        lr = self.last_mouse_release[button]268        cr = (time(), self._control_hit, x, y)269        self.last_mouse_release[button] = cr270        self._position_events('on_mouse_release', x, y, button, modifier)271        # single click272        if cr[1] == lp[1]:273            self._position_events('on_mouse_click', x, y, button, modifier)274        # double click275        if cr[0]-lr[0] < 0.2:  # time limit276            if abs(cr[2] - lr[2]) + abs(cr[3] - lr[3]) < 4:  # shift limit277                if cr[1] == lr[1]:  # Control limit278                    self._position_events('on_mouse_dblclick', x, y, button, modifier)279    on_mouse_motion = lambda self, *args: self._position_events('on_mouse_motion', *args)280    on_mouse_drag = lambda self, *args: self._position_events('on_mouse_drag', *args)281    on_mouse_scroll = lambda self, *args: self._position_events('on_mouse_scroll', *args)282    def _text_events(self, _type, *args):283        cap_list = self._capture_events.setdefault(_type, [])284        if cap_list:285            con = cap_list[-1]286            con.dispatch_event(_type, *args)287        if self.current_focus:288            self.current_focus.dispatch_event(_type, *args)289    def on_key_press(self, symbol, modifiers):290        self.key_state[symbol] = True291        return self._text_events('on_key_press', symbol, modifiers)292    def on_key_release(self, symbol, modifiers):293        self.key_state[symbol] = False294        return self._text_events('on_key_release', symbol, modifiers)295    on_text = lambda self, *args: self._text_events('on_text', *args)296    on_text_motion = lambda self, *args: self._text_events('on_text_motion', *args)297    on_text_motion_select = lambda self, *args: self._text_events('on_text_motion_select', *args)298    def dispatch_message(self, args):299        l = [self]300        while l:301            c = l.pop(0)302            c.dispatch_event('on_message', *args)303            l.extend(c.control_list)304    def on_message(self, _type, *args):305        if _type == 'app_exit':306            pyglet.app.exit()307Control.register_event_type('on_key_press')308Control.register_event_type('on_key_release')309Control.register_event_type('on_text')310Control.register_event_type('on_text_motion')311Control.register_event_type('on_text_motion_select')312Control.register_event_type('on_mouse_motion')313Control.register_event_type('on_mouse_press')314Control.register_event_type('on_mouse_drag')315Control.register_event_type('on_mouse_release')316Control.register_event_type('on_mouse_scroll')317Control.register_event_type('on_mouse_enter')318Control.register_event_type('on_mouse_leave')319Control.register_event_type('on_mouse_click')320Control.register_event_type('on_mouse_dblclick')321Control.register_event_type('on_focus')322Control.register_event_type('on_lostfocus')323Control.register_event_type('on_message')324Control.register_event_type('on_resize')325Overlay.register_event_type('on_switch')326Overlay.register_event_type('on_switchout')327def init_gui():328    global main_window, sched_queue, current_time, fps_limit329    config = pyglet.gl.Config(330        double_buffer=True,331        buffer_size=32,332        aux_buffers=0,333        sample_buffers=0,334        samples=0,335        red_size=8,336        green_size=8,337        blue_size=8,338        alpha_size=8,339        depth_size=0,340        stencil_size=0,341        accum_red_size=0,342        accum_green_size=0,343        accum_blue_size=0,344        accum_alpha_size=0,345    )346    from options import options347    z = options.zoom348    main_window = pyglet.window.Window(349        width=int(WINDOW_WIDTH * z), height=int(WINDOW_HEIGHT * z), caption=u'䏿¹ç¬¦æç¥',350        config=config, visible=False351    )352    sched_queue = []353    from pyglet.gl import gl_info354    vendor = gl_info.get_vendor().lower()355    if 'amd' in vendor or 'ati' in vendor:356        pyglet.options['graphics_vbo'] = False  # AMD: Do you have QA team for your OpenGL driver ????357        from pyglet.graphics import vertexbuffer358        assert not vertexbuffer._enable_vbo359    # custom font renderer360    from .font import AncientPixFont361    pyglet.font._font_class = AncientPixFont362    # main window setup {{363    glClearColor(1, 1, 1, 1)364    glEnable(GL_BLEND)365    glBlendFunc(GL_SRC_ALPHA, GL_ONE_MINUS_SRC_ALPHA)366    # glEnable(GL_SCISSOR_TEST)367    glPolygonMode(GL_FRONT, GL_FILL)368    glPolygonMode(GL_BACK, GL_LINE)369    def_overlay = Overlay()370    def_overlay.name = 'Overlay'371    def_overlay.switch()372    # }} main window setup373    fps = pyglet.clock.ClockDisplay()374    fps_limit = 60375    delay = 1. / fps_limit376    current_time = time()377    @main_window.event378    def on_draw():379        global current_time380        current_time = time()381        Overlay.cur_overlay.draw()382        fps.draw()383    @main_window.event384    def on_close():385        # HACK: suppress exceptions,386        # dont want to explain....387        pyglet.clock.tick = lambda a: 0.0388        pyglet.app.exit()389    @hook(main_window)390    def on_key_press(ori, symbol, modifiers):391        if symbol == pyglet.window.key.ESCAPE:392            return pyglet.event.EVENT_HANDLED393        return ori(symbol, modifiers)394    def _dispatch_msg(dt):395        global sched_queue396        import gevent397        # give logics a chance to run398        gevent.sleep(0.001)399        if not sched_queue: return400        for func in sched_queue:401            func()402        sched_queue = []403    pyglet.clock.schedule_interval(_dispatch_msg, delay)404def ui_schedule(func, *args, **kwargs):405    global sched_queue406    sched_queue.append(partial(func, *args, **kwargs))407def process_msg(args):408    Overlay.cur_overlay.dispatch_message(args)409def ui_message(*args):410    '''411    Send message to UI412    '''...broker_event_consumer.py
Source:broker_event_consumer.py  
...37        self._run_from_broker()38    def _run_from_broker(self):39        """Load the events from celery"""40        try:41            self._capture_events()42        finally:43            self._on_cleanup()44    def _capture_events(self):45        try_interval = 146        while not self._is_root_complete():47            try:48                try_interval *= 249                with self.celery_app.connection() as conn:50                    conn.ensure_connection(max_retries=1, interval_start=0)51                    self.celery_event_receiver = EventReceiver(52                        conn,53                        handlers={"*": self._on_event},54                        app=self.celery_app)55                    try_interval = 156                    self._ready()57                    self.celery_event_receiver.capture(58                        limit=None, timeout=None, wakeup=True)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
