Best Python code snippet using playwright-python
mod_turnByTurn.py
Source:mod_turnByTurn.py  
1# -*- coding: utf-8 -*-2#----------------------------------------------------------------------------3# A turn by turn navigation module.4#----------------------------------------------------------------------------5# Copyright 2007, Oliver White6#7# This program is free software: you can redistribute it and/or modify8# it under the terms of the GNU General Public License as published by9# the Free Software Foundation, either version 3 of the License, or10# (at your option) any later version.11#12# This program is distributed in the hope that it will be useful,13# but WITHOUT ANY WARRANTY; without even the implied warranty of14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the15# GNU General Public License for more details.16#17# You should have received a copy of the GNU General Public License18# along with this program.  If not, see <http://www.gnu.org/licenses/>.19#---------------------------------------------------------------------------20from modules.base_module import RanaModule21from core import geo22from core import threads23from core import constants24from core.signal import Signal25import time26from threading import RLock27REROUTE_CHECK_INTERVAL = 5000  # in ms28# in m/s, about 46 km/h - if this speed is reached, the rerouting threshold is multiplied29# by REROUTING_THRESHOLD_MULTIPLIER30INCREASE_REROUTING_THRESHOLD_SPEED = 2031REROUTING_DEFAULT_THRESHOLD = 3032# not enabled at the moment - needs more field testing33REROUTING_THRESHOLD_MULTIPLIER = 1.034# how many times needs the threshold be crossed to35# trigger rerouting36REROUTING_TRIGGER_COUNT = 337MAX_CONSECUTIVE_AUTOMATIC_REROUTES = 338AUTOMATIC_REROUTE_COUNTER_EXPIRATION_TIME = 600  # in seconds39def getModule(*args, **kwargs):40    return TurnByTurn(*args, **kwargs)41class TurnByTurn(RanaModule):42    """A turn by turn navigation module."""43    def __init__(self, *args, **kwargs):44        RanaModule.__init__(self, *args, **kwargs)45        # initial colors46        self.navigationBoxBackground = (0, 0, 1, 0.3)  # very transparent blue47        self.navigationBoxText = (1, 1, 1, 1)  # non-transparent white48        self._tbt_worker_lock = RLock()49        self._tbt_worker_enabled = False50        self._go_to_initial_state()51        self._automatic_reroute_counter = 0  # counts consecutive automatic reroutes52        self._last_automatic_reroute_timestamp = time.time()53        # reroute even though the route was not yet reached (for special cases)54        self._override_route_reached = False55        # signals56        self.navigation_started = Signal()57        self.navigation_stopped = Signal()58        self.destination_reached = Signal()59        self.rerouting_triggered = Signal()60        self.current_step_changed = Signal()61    def _go_to_initial_state(self):62        """restore initial state"""63        self._route = None64        self._current_step_index_value = 065        self._current_step_indicator = None66        self._espeak_first_and_half_trigger = False67        self._espeak_first_trigger = False68        self._espeak_second_trigger = False69        self._current_distance = None70        self._current_step = None71        self._navigation_box_hidden = False72        self._m_route_length = 073        self._location_watch_id = None74        self._on_route = False75        # rerouting is enabled once the route is reached for the first time76        self._route_reached = False77        self._rerouting_threshold_multiplier = 1.078        self._rerouting_threshold_crossed_counter = 079    def firstTime(self):80        icons = self.m.get('icons', None)81        if icons:82            icons.subscribeColorInfo(self, self._colors_changed_cb)83    def _colors_changed_cb(self, colors):84        self.navigationBoxBackground = colors['navigation_box_background'].getCairoColor()85        self.navigationBoxText = colors['navigation_box_text'].getCairoColor()86    def handleMessage(self, message, messageType, args):87        if message == 'start':88            if messageType == 'ms':89                self.start_tbt(from_where=args)90        elif message == 'stop':91            self.stop_tbt()92        elif message == 'reroute':  # manual rerouting93            # reset automatic reroute counter94            self._automatic_reroute_counter = 095            self._reroute()96        elif message == "toggleBoxHiding":97            self.log.info("toggling navigation box visibility")98            self._navigation_box_hidden = not self._navigation_box_hidden99        elif message == "switchToPreviousTurn":100            self.switch_to_previous_step()101        elif message == "switchToNextTurn":102            self.switch_to_next_step()103        elif message == "showMessageInsideNotification":104            if self.current_step:105                message = "<i>turn description:</i>\n%s" % self.current_step.description106                if self.dmod.has_custom_notification_support:107                    self.dmod.notify(message, 7000)108                    # TODO: add support for modRana notifications once they support line wrapping109    def _reroute_auto(self):110        """This function is called when automatic rerouting is triggered."""111        # check time from last automatic reroute112        dt = time.time() - self._last_automatic_reroute_timestamp113        if dt >= AUTOMATIC_REROUTE_COUNTER_EXPIRATION_TIME:114            # reset the automatic reroute counter115            self._automatic_reroute_counter = 0116            self.log.debug('automatic reroute counter expired, clearing')117        # on some routes, when we are moving away from the start of the route, it118        # is needed to reroute a couple of times before the correct way is found119        # on the other hand there should be a limit on the number of times120        # modRana reroutes in a row121        #122        # SOLUTION:123        # 1. enable automatic rerouting even though the route was not yet reached124        # (as we are moving away from it)125        # 2. do this only a limited number of times (up to 3 times in a row)126        # 3. the counter is reset by manual rerouting, by reaching the route or after 10 minutes127        if self._automatic_reroute_counter < MAX_CONSECUTIVE_AUTOMATIC_REROUTES:128            self.log.debug('faking that route was reached to enable new rerouting')129            self._override_route_reached = True130        else:131            self.log.info('tbt: too many consecutive reroutes (%d),', self._automatic_reroute_counter)132            self.log.info('reach the route to enable automatic rerouting')133            self.log.info('or reroute manually')134        # increment the automatic reroute counter & update the timestamp135        self._automatic_reroute_counter += 1136        self._last_automatic_reroute_timestamp = time.time()137        # trigger rerouting138        self._reroute()139        self.rerouting_triggered()140    def _reroute(self):141        # 1. say rerouting is in progress142        voiceMessage = "rerouting"143        voice = self.m.get('voice', None)144        if voice:145            voice.say(voiceMessage, "en")  # make sure rerouting said with english voice146        time.sleep(2)  # TODO: improve this147        route = self.m.get('route', None)148        if route:149            # 2. get a new route from current position to destination150            route.reroute()151        # 3. restart routing for to this new route from the closest point152        self.sendMessage("ms:turnByTurn:start:closest")153    @property154    def automatic_rerouting_enabled(self):155        """Report if automatic rerouting is enabled.156        Automatic rerouting is considered to be enabled if the reroutingThreshold key157        holds a value that is not None (in general a string representation of a floating158        point number).159        :return: if automatic rerouting is enabled160        :rtype: bool161        """162        return self.get('reroutingThreshold', REROUTING_DEFAULT_THRESHOLD)163    def _say_turn(self, message, distanceInMeters, force_language_code=False):164        """Say a text-to-speech message about a turn.165        This basically wraps the simple say method from voice and adds some more information,166        like current distance to the turn.167        """168        voice = self.m.get('voice', None)169        units = self.m.get('units', None)170        if voice and units:171            (dist_string, short, long) = units.humanRound(distanceInMeters)172            if dist_string == "0":173                dist_string = ""174            else:175                dist_string = '<p xml:lang="en">in <emphasis level="strong">' + dist_string + ' ' + long + '</emphasis></p><br>'176                # TODO: language specific distance strings177            text = dist_string + message178            #      """ the message can contain unicode, this might cause an exception when printing it179            #      in some systems (SHR-u on Neo, for example)"""180            #      try:181            #        print("saying: %s" % text)182            #        pass183            #      except UnicodeEncodeError:184            #        print("voice: printing the current message to stdout failed do to unicode conversion error")185            if force_language_code:186                espeak_language_code = force_language_code187            else:188                # the espeak language code is the first part of this whitespace delimited string189                espeak_language_code = self.get('directionsLanguage', 'en en').split(" ")[0]190            return voice.say(text, espeak_language_code)191    @property192    def _max_step_index(self):193        return self._route.message_point_count - 1194    def _get_starting_step(self, which='first'):195        if self._route:196            if which == 'first':197                return self._get_step(0)198            if which == 'closest':199                return self._get_closest_step()200    def _get_closest_step(self):201        """Get the geographically closest step."""202        proj = self.m.get('projection', None)  # we also need the projection module203        pos = self.get('pos', None)  # and current position204        if pos and proj:205            (lat1, lon1) = pos206            temp_steps = self._route.message_points207            for step in temp_steps:208                (lat2, lon2) = step.getLL()209                step.current_distance = geo.distance(lat1, lon1, lat2, lon2) * 1000  # km to m210            closest_step = sorted(temp_steps, key=lambda x: x.current_distance)[0]211            return closest_step212    def _get_step(self, index):213        """Return steps for valid index, None otherwise."""214        max_index = self._max_step_index215        if index > max_index or index < -(max_index + 1):216            self.log.error("wrong turn index: %d, max index is: %d", index, max_index)217            return None218        else:219            return self._route.get_message_point_by_index(index)220    def _get_step_index(self, step):221        return self._route.get_message_point_index(step)222    @property223    def current_step(self):224        """Return current step."""225        return self._route.get_message_point_by_index(self._current_step_index)226    @current_step.setter227    def current_step(self, step):228        """Set a given step as current step."""229        mp_index = self._route.get_message_point_index(step)230        self._current_step_index = mp_index231        self.current_step_changed(step)232    @property233    def _current_step_index(self):234        return self._current_step_index_value235    @_current_step_index.setter236    def _current_step_index(self, index):237        self._current_step_index_value = index238        # trigger a navigation update every time current step index is set239        self._do_navigation_update()240    def switch_to_previous_step(self):241        """Switch to previous step and clean up."""242        next_index = self._current_step_index - 1243        if next_index >= 0:244            self._current_step_index = next_index245            self._espeak_first_trigger = False246            self._espeak_second_trigger = False247            self.log.info("switching to previous step")248            self.current_step_changed(self.current_step)249        else:250            self.log.info("previous step reached")251    def switch_to_next_step(self):252        """Switch to next step and clean up."""253        max_index = self._max_step_index254        next_index = self._current_step_index + 1255        if next_index <= max_index:256            self._current_step_index = next_index257            self._espeak_first_and_half_trigger = False258            self._espeak_first_trigger = False259            self._espeak_second_trigger = False260            self.log.info("switching to next step")261            self.current_step_changed(self.current_step)262        else:263            self.log.info("last step reached")264            self._last_step_reached()265    def _last_step_reached(self):266        """Handle all tasks that are needed once the last step is reached."""267        # disable automatic rerouting268        self._stop_tbt_worker()269        # automatic rerouting needs to be disabled to prevent rerouting270        # once the destination was reached271        self.destination_reached()272    def enabled(self):273        """Return if Turn by Turn navigation is enabled."""274        if self._route:275            return True276        else:277            return False278    def start_tbt(self, from_where='first'):279        """Start Turn-by-turn navigation."""280        # clean up any possible previous navigation data281        self._go_to_initial_state()282        # NOTE: turn and step are used interchangeably in the documentation283        m = self.m.get('route', None)284        if m:285            route = m.get_current_directions()286            if route:  # is the route nonempty ?287                self._route = route288                # get route in radians for automatic rerouting289                self.radiansRoute = route.get_points_lle_radians(drop_elevation=True)290                # start rerouting watch291                self._start_tbt_worker()292                # for some reason the combined distance does not account for the last step293                self._m_route_length = route.length294                # some statistics295                meters_per_sec_speed = self.get('metersPerSecSpeed', None)296                dt = m.route_lookup_duration297                self.log.info("route lookup took: %f s" % dt)298                if dt and meters_per_sec_speed:299                    dm = dt * meters_per_sec_speed300                    self.log.info("distance traveled during lookup: %f m" % dm)301                    # the duration of the road lookup and other variables are currently not used302                # in the heuristics but might be added later to make the heuristics more robust303                # now we decide if we use the closest turn, or the next one,304                # as we might be already past it and on our way to the next turn305                cs = self._get_closest_step()  # get geographically closest step306                pos = self.get('pos', None)  # get current position307                p_reached_dist = int(self.get('pointReachedDistance', 30))  # get the trigger distance308                next_turn_id = self._get_step_index(cs) + 1309                next_step = self._get_step(next_turn_id)310                # check if we have all the data needed for our heuristics311                self.log.info("trying to guess correct step to start navigation")312                if next_step and pos and p_reached_dist:313                    (lat, lon) = pos314                    (csLat, csLon) = cs.getLL()315                    (nsLat, nsLon) = next_step.getLL()316                    pos2next_step = geo.distance(lat, lon, nsLat, nsLon) * 1000317                    pos2current_step = geo.distance(lat, lon, csLat, csLon) * 1000318                    current_step2next_step = geo.distance(csLat, csLon, nsLat, nsLon) * 1000319                    #          self.log.debug("pos",(lat,lon))320                    #          self.log.debug("cs",(csLat,csLon))321                    #          self.log.debug("ns",(nsLat,nsLon))322                    self.log.debug("position to next turn: %f m" % pos2next_step)323                    self.log.debug("position to current turn: %f m" % pos2current_step)324                    self.log.debug("current turn to next turn: %f m" % current_step2next_step)325                    self.log.debug("turn reached trigger distance: %f m" % p_reached_dist)326                    if pos2current_step > p_reached_dist:327                        # this means we are out of the "capture circle" of the closest step328                        # what is more distant, the closest or the next step ?329                        if pos2next_step < current_step2next_step:330                            # we are mostly probably already past the closest step,331                            # so we switch to the next step at once332                            self.log.debug("already past closest turn, switching to next turn")333                            self.current_step = next_step334                            # we play the message for the next step,335                            # with current distance to this step,336                            # to assure there is some voice output immediately after337                            # getting a new route or rerouting"""338                            plaintextMessage = next_step.ssml_message339                            self._say_turn(plaintextMessage, pos2next_step)340                        else:341                            # we have probably not yet reached the closest step,342                            # so we start navigation from it343                            self.log.debug("closest turn not yet reached")344                            self.current_step = cs345                    else:346                        # we are inside the  "capture circle" of the closest step,347                        # this means the navigation will trigger the voice message by itself348                        # and correctly switch to next step349                        # -> no need to switch to next step from here350                        self.log.debug("inside reach distance of closest turn")351                        self.current_step = cs352                else:353                    # we dont have some of the data, that is needed to decide354                    # if we start the navigation from the closest step of from the step that is after it355                    # -> we just start from the closest step356                    self.log.debug("not enough data to decide, using closest turn")357                    self.current_step = cs358        self._do_navigation_update()  # run a first time navigation update359        self._location_watch_id = self.watch('locationUpdated', self.location_update_cb)360        self.log.info("started and ready")361        # trigger the navigation-started signal362        self.navigation_started()363    def stop_tbt(self):364        """Stop Turn-by-Turn navigation."""365        # remove location watch366        if self._location_watch_id:367            self.removeWatch(self._location_watch_id)368        # cleanup369        self._go_to_initial_state()370        self._stop_tbt_worker()371        self.log.info("stopped")372        self.navigation_stopped()373    def location_update_cb(self, key, newValue, oldValue):374        """Position changed, do a tbt navigation update."""375        # TODO: use a Signal for this ?376        if key == "locationUpdated":  # just to be sure377            self._do_navigation_update()378        else:379            self.log.error("invalid key: %r", key)380    def _do_navigation_update(self):381        """Do a navigation update."""382        # make sure there really are some steps383        if not self._route:384            self.log.error("no route")385            return386        pos = self.get('pos', None)387        if pos is None:388            self.log.error("skipping update, invalid position")389            return390        # get/compute/update necessary the values391        lat1, lon1 = pos392        lat2, lon2 = self.current_step.getLL()393        current_distance = geo.distance(lat1, lon1, lat2, lon2) * 1000  # km to m394        self._current_distance = current_distance  # update current distance395        # use some sane minimum distance396        distance = int(self.get('minAnnounceDistance', 100))397        # GHK: make distance speed-sensitive398        #399        # I came up with this formula after a lot of experimentation with400        # gnuplot.  The idea is to give the user some simple parameters to401        # adjust yet let him have a lot of control.  There are five402        # parameters in the equation:403        #404        # low_speed	Speed below which the pre-announcement time is constant.405        # low_time	Announcement time at and below lowSpeed.406        # high_speed	Speed above which the announcement time is constant.407        # high_time	Announcement time at and above highSpeed.408        # power	Exponential power used in the formula; good values are 0.5-5409        #410        # The speeds are in m/s.  Obviously highXXX must be greater than lowXXX.411        # If power is 1.0, announcement times increase linearly above lowSpeed.412        # If power < 1.0, times rise rapidly just above lowSpeed and more413        # gradually approaching highSpeed.  If power > 1.0, times rise414        # gradually at first and rapidly near highSpeed.  I like power > 1.0.415        #416        # The reasoning is that at high speeds you are probably on a417        # motorway/freeway and will need extra time to get into the proper418        # lane to take your exit.  That reasoning is pretty harmless on a419        # high-speed two-lane road, but it breaks down if you are stuck in420        # heavy traffic on a four-lane freeway (like in Los Angeles421        # where I live) because you might need quite a while to work your422        # way across the traffic even though you're creeping along.  But I423        # don't know a good way to detect whether you're on a multi-lane road,424        # I chose speed as an acceptable proxy.425        #426        # Regardless of speed, we always warn a certain distance ahead (see427        # "distance" above).  That distance comes from the value in the current428        # step of the directions.429        #430        # BTW, if you want to use gnuplot to play with the curves, try:431        # max(a,b) = a > b ? a : b432        # min(a,b) = a < b ? a : b433        # warn(x,t1,s1,t2,s2,p) = min(t2,(max(s1,x)-s1)**p*(t2-t1)/(s2-s1)**p+t1)434        # plot [0:160][0:] warn(x,10,50,60,100,2.0)435        #436        meters_per_sec_speed = self.get('metersPerSecSpeed', None)437        point_reached_distance = int(self.get('pointReachedDistance', 30))438        if meters_per_sec_speed:439            # check if we can miss the point by going too fast -> mps speed > point reached distance440            # also enlarge the rerouting threshold as it looks like it needs to be larger441            # when moving at high speed to prevent unnecessary rerouting442            if meters_per_sec_speed > point_reached_distance * 0.75:443                point_reached_distance = meters_per_sec_speed * 2444            #        self.log.debug("tbt: enlarging point reached distance to: %1.2f m due to large speed (%1.2f m/s)". (pointReachedDistance, metersPerSecSpeed)445            if meters_per_sec_speed > INCREASE_REROUTING_THRESHOLD_SPEED:446                self._rerouting_threshold_multiplier = REROUTING_THRESHOLD_MULTIPLIER447            else:448                self._rerouting_threshold_multiplier = 1.0449            # speed & time based triggering450            low_speed = float(self.get('minAnnounceSpeed', 13.89))451            high_speed = float(self.get('maxAnnounceSpeed', 27.78))452            high_speed = max(high_speed, low_speed + 0.1)453            low_time = int(self.get('minAnnounceTime', 10))454            high_time = int(self.get('maxAnnounceTime', 60))455            high_time = max(high_time, low_time)456            power = float(self.get('announcePower', 2.0))457            warn_time = (max(low_speed, meters_per_sec_speed) - low_speed) ** power \458                       * (high_time - low_time) / (high_speed - low_speed) ** power \459                       + low_time460            warn_time = min(high_time, warn_time)461            distance = max(distance, warn_time * meters_per_sec_speed)462            if self.get('debugTbT', False):463                self.log.debug("#####")464                self.log.debug("min/max announce time: %d/%d s", low_time, high_time)465                self.log.debug("trigger distance: %1.2f m (%1.2f s warning)", distance, distance / float(meters_per_sec_speed))466                self.log.debug("current distance: %1.2f m", current_distance)467                self.log.debug("current speed: %1.2f m/s (%1.2f km/h)", meters_per_sec_speed, meters_per_sec_speed * 3.6)468                self.log.debug("point reached distance: %f m", point_reached_distance)469                self.log.debug("1. triggered=%r, 1.5. triggered=%r, 2. triggered=%r",470                               self._espeak_first_trigger, self._espeak_first_and_half_trigger, self._espeak_second_trigger)471                if warn_time > 30:472                    self.log.debug("optional (20 s) trigger distance: %1.2f", 20.0 * meters_per_sec_speed)473            if current_distance <= point_reached_distance:474                # this means we reached the point"""475                if self._espeak_second_trigger is False:476                    self.log.debug("triggering espeak nr. 2")477                    # say the message without distance478                    plaintextMessage = self.current_step.ssml_message479                    # consider turn said even if it was skipped (ignore errors)480                    self._say_turn(plaintextMessage, 0)481                    self.current_step.visited = True  # mark this point as visited482                    self._espeak_first_trigger = True  # everything has been said, again :D483                    self._espeak_second_trigger = True  # everything has been said, again :D484                self.switch_to_next_step()  # switch to next step485            else:486                if current_distance <= distance:487                    # this means we reached an optimal distance for saying the message"""488                    if self._espeak_first_trigger is False:489                        self.log.debug("triggering espeak nr. 1")490                        plaintextMessage = self.current_step.ssml_message491                        if self._say_turn(plaintextMessage, current_distance):492                            self._espeak_first_trigger = True  # first message done493                if self._espeak_first_and_half_trigger is False and warn_time > 30:494                    if current_distance <= (20.0 * meters_per_sec_speed):495                        # in case that the warning time gets too big, add an intermediate warning at 20 seconds496                        # NOTE: this means it is said after the first trigger497                        plaintextMessage = self.current_step.ssml_message498                        if self._say_turn(plaintextMessage, current_distance):499                            self._espeak_first_and_half_trigger = True  # intermediate message done500            ## automatic rerouting ##501            # is automatic rerouting enabled from options502            # enabled == threshold that is not not None503            if self.automatic_rerouting_enabled:504                # rerouting is enabled only once the route is reached for the first time505                if self._on_route and not self._route_reached:506                    self._route_reached = True507                    self._automatic_reroute_counter = 0508                    self.log.info('route reached, rerouting enabled')509                # did the TBT worker detect that the rerouting threshold was reached ?510                if self._rerouting_conditions_met():511                    # test if enough consecutive divergence point were recorded512                    if self._rerouting_threshold_crossed_counter >= REROUTING_TRIGGER_COUNT:513                        # reset the routeReached override514                        self._override_route_reached = False515                        # trigger rerouting516                        self._reroute_auto()517                else:518                    # reset the counter519                    self._rerouting_threshold_crossed_counter = 0520    def _rerouting_conditions_met(self):521        return (self._route_reached or self._override_route_reached) and not self._on_route522    def _following_route(self):523        """Are we still following the route or is rerouting needed ?"""524        start1 = time.perf_counter()525        pos = self.get('pos', None)526        proj = self.m.get('projection', None)527        if pos and proj:528            pLat, pLon = pos529            # we use Radians to get rid of radian conversion overhead for530            # the geographic distance computation method531            radians_ll = self.radiansRoute532            pLat = geo.radians(pLat)533            pLon = geo.radians(pLon)534            if len(radians_ll) == 0:535                self.log.error("Divergence: can't follow a zero point route")536                return False537            elif len(radians_ll) == 1:  # 1 point route538                aLat, aLon = radians_ll[0]539                min_distance = geo.distance_approx_radians(pLat, pLon, aLat, aLon)540            else:  # 2+ points route541                aLat, aLon = radians_ll[0]542                bLat, bLon = radians_ll[1]543                min_distance = geo.distance_point_to_line_radians(pLat, pLon, aLat, aLon, bLat, bLon)544                aLat, aLon = bLat, bLon545                for point in radians_ll[1:]:546                    bLat, bLon = point547                    dist = geo.distance_point_to_line_radians(pLat, pLon, aLat, aLon, bLat, bLon)548                    if dist < min_distance:549                        min_distance = dist550                    aLat, aLon = bLat, bLon551                # the multiplier tries to compensate for high speed movement552            threshold = float(553                self.get('reroutingThreshold', REROUTING_DEFAULT_THRESHOLD)) * self._rerouting_threshold_multiplier554            self.log.debug("Divergence from route: %1.2f/%1.2f m computed in %1.0f ms",555            min_distance * 1000, float(threshold), (1000 * (time.perf_counter() - start1)))556            return min_distance * 1000 < threshold557    def _start_tbt_worker(self):558        with self._tbt_worker_lock:559            # reuse previous thread or start new one560            if self._tbt_worker_enabled:561                self.log.info("reusing TBT worker thread")562            else:563                self.log.info("starting new TBT worker thread")564                t = threads.ModRanaThread(name=constants.THREAD_TBT_WORKER,565                                          target=self._tbt_worker)566                threads.threadMgr.add(t)567                self._tbt_worker_enabled = True568    def _stop_tbt_worker(self):569        with self._tbt_worker_lock:570            self.log.info("stopping the TBT worker thread")571            self._tbt_worker_enabled = False572    def _tbt_worker(self):573        """This function runs in its own thread and checks if we are still following the route."""574        self.log.info("TBTWorker: started")575        # The _tbt_worker_enabled variable is needed as once the end of the route is reached576        # there will be a route set but further rerouting should not be performed.577        while True:578            with self._tbt_worker_lock:579                # Either tbt has been shut down (no route is set)580                # or rerouting is no longer needed.581                if not self._route or not self._tbt_worker_enabled:582                    self.log.info("TBTWorker: shutting down")583                    break584            # first make sure automatic rerouting is enabled585            # eq. reroutingThreshold != None586            if self.automatic_rerouting_enabled:587                # check if we are still following the route588                # self.log.debug('TBTWorker: checking divergence from route')589                self._on_route = self._following_route()590                if self._rerouting_conditions_met():591                    self.log.info('TBTWorker: divergence detected')592                    # switch to quick updates593                    for i in range(0, REROUTING_TRIGGER_COUNT + 1):594                        time.sleep(1)595                        onRoute = self._following_route()596                        if onRoute:  # divergence stopped597                            self._on_route = onRoute598                            self.log.info('TBTWorker: false alarm')599                            break600                        else:  # still diverging from current route601                            self._on_route = onRoute602                            # increase divergence counter603                            self._rerouting_threshold_crossed_counter += 1604                            self.log.debug('TBTWorker: increasing divergence counter (%d)',605                                           self._rerouting_threshold_crossed_counter)606            time.sleep(REROUTE_CHECK_INTERVAL / 1000.0)607    def shutdown(self):608        # cleanup..._page.py
Source:_page.py  
...193            ),194        )195        self._channel.on(196            "route",197            lambda params: self._on_route(198                from_channel(params["route"]), from_channel(params["request"])199            ),200        )201        self._channel.on(202            "video",203            lambda params: cast(Video, self.video)._set_relative_path(204                params["relativePath"]205            ),206        )207        self._channel.on(208            "webSocket",209            lambda params: self.emit(210                Page.Events.WebSocket, from_channel(params["webSocket"])211            ),212        )213        self._channel.on(214            "worker", lambda params: self._on_worker(from_channel(params["worker"]))215        )216    def _set_browser_context(self, context: "BrowserContext") -> None:217        self._browser_context = context218        self._timeout_settings = TimeoutSettings(context._timeout_settings)219    def _on_request_failed(220        self,221        request: Request,222        response_end_timing: float,223        failure_text: str = None,224    ) -> None:225        request._failure_text = failure_text226        if request._timing:227            request._timing["responseEnd"] = response_end_timing228        self.emit(Page.Events.RequestFailed, request)229    def _on_request_finished(230        self, request: Request, response_end_timing: float231    ) -> None:232        if request._timing:233            request._timing["responseEnd"] = response_end_timing234        self.emit(Page.Events.RequestFinished, request)235    def _on_frame_attached(self, frame: Frame) -> None:236        frame._page = self237        self._frames.append(frame)238        self.emit(Page.Events.FrameAttached, frame)239    def _on_frame_detached(self, frame: Frame) -> None:240        self._frames.remove(frame)241        frame._detached = True242        self.emit(Page.Events.FrameDetached, frame)243    def _on_route(self, route: Route, request: Request) -> None:244        for handler_entry in self._routes:245            if handler_entry.matcher.matches(request.url):246                result = cast(Any, handler_entry.handler)(route, request)247                if inspect.iscoroutine(result):248                    asyncio.create_task(result)249                return250        self._browser_context._on_route(route, request)251    def _on_binding(self, binding_call: "BindingCall") -> None:252        func = self._bindings.get(binding_call._initializer["name"])253        if func:254            asyncio.create_task(binding_call.call(func))255        self._browser_context._on_binding(binding_call)256    def _on_worker(self, worker: "Worker") -> None:257        self._workers.append(worker)258        worker._page = self259        self.emit(Page.Events.Worker, worker)260    def _on_close(self) -> None:261        self._is_closed = True262        self._browser_context._pages.remove(self)263        self.emit(Page.Events.Close)264    def _on_crash(self) -> None:...two_way_env.py
Source:two_way_env.py  
...75        #self.print_obs_space(ref_vehicle=self.vehicle)76        #self.print_obs_space(ref_vehicle=self.idmdp_vehicle)77        #self._set_curriculam(curriculam_reward_threshold=0.6*self.GOAL_REWARD)78        return (obs, rew, done, info)79    def _on_route(self, veh=None):80        if veh is None:81            veh = self.vehicle82        lane_ID = veh.lane_index[2]83        onroute = (lane_ID == 1)84        return onroute85    86    def _on_road(self, veh=None):87        if veh is None:88            veh = self.vehicle89        return (veh.position[0] < self.ROAD_LENGTH) and (veh.position[0] > 0)90    def _goal_achieved(self, veh=None):91        if veh is None:92            veh = self.vehicle93        return (veh.position[0] > self.config["GOAL_LENGTH"] and \94                self._on_route(veh))95    def _reward(self, action):96        """97            The vehicle is rewarded for driving with high velocity98        :param action: the action performed99        :return: the reward of the state-action transition100        """101        #neighbours = self.road.network.all_side_lanes(self.vehicle.lane_index)102        collision_reward = self.config["COLLISION_REWARD"] * self.vehicle.crashed103        velocity_reward = self.config["VELOCITY_REWARD"] * (self.vehicle.velocity_index -1) / (self.vehicle.SPEED_COUNT - 1)104        if (velocity_reward > 0):105            velocity_reward *= self._on_route()106        goal_reward = self.config["GOAL_REWARD"]107        if self.vehicle.crashed:108            reward = collision_reward + min(0.0, velocity_reward)109        elif self._goal_achieved():110            reward = goal_reward + velocity_reward111        else:112            reward = velocity_reward113        if not self.vehicle.action_validity:114            reward = reward + self.config["INVALID_ACTION_REWARD"]115        return reward116    def _is_terminal(self):117        """118            The episode is over if the ego vehicle crashed or the time is out.119        """...multilane_env.py
Source:multilane_env.py  
...75        self.ego_x0 = None76        EnvViewer.SCREEN_HEIGHT = self.config['screen_height']77        EnvViewer.SCREEN_WIDTH = self.config['screen_width']78        self.reset()79    def _on_route(self, veh=None):80        return True81    82    def _on_road(self, veh=None):83        if veh is None:84            veh = self.vehicle85        return (veh.position[0] < self.ROAD_LENGTH) and (veh.position[0] > 0)86    def _goal_achieved(self, veh=None):87        if veh is None:88            veh = self.vehicle89        return (veh.position[0] > 0.99 * self.ROAD_LENGTH) and \90                self._on_route(veh)91    def reset(self):92        self.steps = 093        self._create_road()94        self._create_vehicles()95        return super(MultilaneEnv, self).reset()96    def step(self, action):97        self.steps += 198        self.previous_action = action99        obs, rew, done, info = super(MultilaneEnv, self).step(action)100        self.episode_travel = self.vehicle.position[0] - self.ego_x0101        return (obs, rew, done, info)102    def _create_road(self):103        """104            Create a road composed of straight adjacent lanes.105        """106        self.road = Road(network=RoadNetwork.straight_road_network(lanes=self.config["lanes_count"], length=self.ROAD_LENGTH),107                         np_random=self.np_random)108    def _create_vehicles(self):109        """110            Create some new random vehicles of a given type, and add them on the road.111        """112        self.vehicle = MDPVehicle.create_random(road=self.road,113                                                velocity=np.random.randint(low=15,high=35),114                                                spacing=self.config["initial_spacing"],115                                                config=self.config)116        self.vehicle.is_ego_vehicle = True117        self.road.vehicles.append(self.vehicle)118        self.ego_x0 = self.vehicle.position[0]119        vehicles_type = utils.class_from_path(self.config["other_vehicles_type"])120        ahead_vehicles = self.config["vehicles_count"] // 2121        behind_vehicles = self.config["vehicles_count"] - ahead_vehicles122        for _ in range(ahead_vehicles):123            self.road.vehicles.append(vehicles_type.create_random(road=self.road,124                                                                  ahead=True,125                                                                  config=self.config)126                                     )127        for _ in range(behind_vehicles):128            self.road.vehicles.append(vehicles_type.create_random(road=self.road,129                                                                  ahead=False,130                                                                  config=self.config)131                                     )132        for _ in range(10):133            self.road.vehicles.append(Obstacle.create_random(road=self.road,134                                                             ahead=False,135                                                             config=self.config136                                                             )137                                     )138        # Add the virtual obstacles139        lane = self.road.network.lanes_list()[0]140        x0 = lane.length/2141        position = lane.position(x0, -3.5)142        lane_index = self.road.network.get_closest_lane_index(143                                                            position=position,144                                                            heading=0  145                                                             )  146        virtual_obstacle_left =       Obstacle(self.road,147                                               position=position,148                                               heading=lane.heading_at(x0),149                                               velocity=0,150                                               target_velocity=0,151                                               lane_index=lane_index,152                                               target_lane_index=lane_index,                     153                                               enable_lane_change=False,154                                               config=self.config)155        virtual_obstacle_left.LENGTH = lane.length156        virtual_obstacle_left.virtual = True157        self.road.vehicles.append(virtual_obstacle_left)158        self.road.virtual_vehicles.append(virtual_obstacle_left)159        lane = self.road.network.lanes_list()[-1]160        x0 = lane.length/2161        position = lane.position(x0, 3.5)162        virtual_obstacle_right =      Obstacle(self.road,163                                               position=position,164                                               heading=lane.heading_at(x0),165                                               velocity=0,166                                               target_velocity=0,167                                               lane_index=lane_index,168                                               target_lane_index=lane_index,                     169                                               enable_lane_change=False,170                                               config=self.config)171        virtual_obstacle_right.LENGTH = lane.length172        virtual_obstacle_right.virtual = True173        self.road.vehicles.append(virtual_obstacle_right)174        self.road.virtual_vehicles.append(virtual_obstacle_right)175    def _reward(self, action):176        """177        The reward is defined to foster driving at high speed, on the rightmost lanes, and to avoid collisions.178        :param action: the last action performed179        :return: the corresponding reward180        """181        action_lookup = dict(map(reversed, AbstractEnv.ACTIONS.items()))182        action_reward = {action_lookup['LANE_LEFT']: self.LANE_CHANGE_REWARD, 183                         action_lookup['IDLE']: 0, 184                         action_lookup['LANE_RIGHT']: self.LANE_CHANGE_REWARD, 185                         action_lookup['FASTER']: 0, 186                         action_lookup['SLOWER']: 0,187                         action_lookup['LANE_LEFT_AGGRESSIVE']: self.AGGRESSIVE_LANE_CHANGE_REWARD,188                         action_lookup['LANE_RIGHT_AGGRESSIVE']: self.AGGRESSIVE_LANE_CHANGE_REWARD189                         }190        neighbours = self.road.network.all_side_lanes(self.vehicle.lane_index)191        collision_reward = self.config["COLLISION_REWARD"] * self.vehicle.crashed192        velocity_reward = self.config["VELOCITY_REWARD"] * (self.vehicle.velocity_index -1) / (self.vehicle.SPEED_COUNT - 1)193        if (velocity_reward > 0):194            velocity_reward *= self._on_route()195        goal_reward = self.config["GOAL_REWARD"]196        if self.vehicle.crashed:197            reward = collision_reward + min(0.0, velocity_reward + action_reward[action])198        elif self._goal_achieved():199            reward = goal_reward + velocity_reward + action_reward[action]200        else:201            reward = velocity_reward + action_reward[action]202        return reward203    def _is_terminal(self):204        """205            The episode is over if the ego vehicle crashed or the time is out.206        """207        return  self.vehicle.crashed or \208                self._goal_achieved() or \...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
