How to use _on_route method in Playwright Python

Best Python code snippet using playwright-python

mod_turnByTurn.py

Source:mod_turnByTurn.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2#----------------------------------------------------------------------------3# A turn by turn navigation module.4#----------------------------------------------------------------------------5# Copyright 2007, Oliver White6#7# This program is free software: you can redistribute it and/or modify8# it under the terms of the GNU General Public License as published by9# the Free Software Foundation, either version 3 of the License, or10# (at your option) any later version.11#12# This program is distributed in the hope that it will be useful,13# but WITHOUT ANY WARRANTY; without even the implied warranty of14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the15# GNU General Public License for more details.16#17# You should have received a copy of the GNU General Public License18# along with this program. If not, see <http://www.gnu.org/licenses/>.19#---------------------------------------------------------------------------20from modules.base_module import RanaModule21from core import geo22from core import threads23from core import constants24from core.signal import Signal25import time26from threading import RLock27REROUTE_CHECK_INTERVAL = 5000 # in ms28# in m/s, about 46 km/h - if this speed is reached, the rerouting threshold is multiplied29# by REROUTING_THRESHOLD_MULTIPLIER30INCREASE_REROUTING_THRESHOLD_SPEED = 2031REROUTING_DEFAULT_THRESHOLD = 3032# not enabled at the moment - needs more field testing33REROUTING_THRESHOLD_MULTIPLIER = 1.034# how many times needs the threshold be crossed to35# trigger rerouting36REROUTING_TRIGGER_COUNT = 337MAX_CONSECUTIVE_AUTOMATIC_REROUTES = 338AUTOMATIC_REROUTE_COUNTER_EXPIRATION_TIME = 600 # in seconds39def getModule(*args, **kwargs):40 return TurnByTurn(*args, **kwargs)41class TurnByTurn(RanaModule):42 """A turn by turn navigation module."""43 def __init__(self, *args, **kwargs):44 RanaModule.__init__(self, *args, **kwargs)45 # initial colors46 self.navigationBoxBackground = (0, 0, 1, 0.3) # very transparent blue47 self.navigationBoxText = (1, 1, 1, 1) # non-transparent white48 self._tbt_worker_lock = RLock()49 self._tbt_worker_enabled = False50 self._go_to_initial_state()51 self._automatic_reroute_counter = 0 # counts consecutive automatic reroutes52 self._last_automatic_reroute_timestamp = time.time()53 # reroute even though the route was not yet reached (for special cases)54 self._override_route_reached = False55 # signals56 self.navigation_started = Signal()57 self.navigation_stopped = Signal()58 self.destination_reached = Signal()59 self.rerouting_triggered = Signal()60 self.current_step_changed = Signal()61 def _go_to_initial_state(self):62 """restore initial state"""63 self._route = None64 self._current_step_index_value = 065 self._current_step_indicator = None66 self._espeak_first_and_half_trigger = False67 self._espeak_first_trigger = False68 self._espeak_second_trigger = False69 self._current_distance = None70 self._current_step = None71 self._navigation_box_hidden = False72 self._m_route_length = 073 self._location_watch_id = None74 self._on_route = False75 # rerouting is enabled once the route is reached for the first time76 self._route_reached = False77 self._rerouting_threshold_multiplier = 1.078 self._rerouting_threshold_crossed_counter = 079 def firstTime(self):80 icons = self.m.get('icons', None)81 if icons:82 icons.subscribeColorInfo(self, self._colors_changed_cb)83 def _colors_changed_cb(self, colors):84 self.navigationBoxBackground = colors['navigation_box_background'].getCairoColor()85 self.navigationBoxText = colors['navigation_box_text'].getCairoColor()86 def handleMessage(self, message, messageType, args):87 if message == 'start':88 if messageType == 'ms':89 self.start_tbt(from_where=args)90 elif message == 'stop':91 self.stop_tbt()92 elif message == 'reroute': # manual rerouting93 # reset automatic reroute counter94 self._automatic_reroute_counter = 095 self._reroute()96 elif message == "toggleBoxHiding":97 self.log.info("toggling navigation box visibility")98 self._navigation_box_hidden = not self._navigation_box_hidden99 elif message == "switchToPreviousTurn":100 self.switch_to_previous_step()101 elif message == "switchToNextTurn":102 self.switch_to_next_step()103 elif message == "showMessageInsideNotification":104 if self.current_step:105 message = "<i>turn description:</i>\n%s" % self.current_step.description106 if self.dmod.has_custom_notification_support:107 self.dmod.notify(message, 7000)108 # TODO: add support for modRana notifications once they support line wrapping109 def _reroute_auto(self):110 """This function is called when automatic rerouting is triggered."""111 # check time from last automatic reroute112 dt = time.time() - self._last_automatic_reroute_timestamp113 if dt >= AUTOMATIC_REROUTE_COUNTER_EXPIRATION_TIME:114 # reset the automatic reroute counter115 self._automatic_reroute_counter = 0116 self.log.debug('automatic reroute counter expired, clearing')117 # on some routes, when we are moving away from the start of the route, it118 # is needed to reroute a couple of times before the correct way is found119 # on the other hand there should be a limit on the number of times120 # modRana reroutes in a row121 #122 # SOLUTION:123 # 1. enable automatic rerouting even though the route was not yet reached124 # (as we are moving away from it)125 # 2. do this only a limited number of times (up to 3 times in a row)126 # 3. the counter is reset by manual rerouting, by reaching the route or after 10 minutes127 if self._automatic_reroute_counter < MAX_CONSECUTIVE_AUTOMATIC_REROUTES:128 self.log.debug('faking that route was reached to enable new rerouting')129 self._override_route_reached = True130 else:131 self.log.info('tbt: too many consecutive reroutes (%d),', self._automatic_reroute_counter)132 self.log.info('reach the route to enable automatic rerouting')133 self.log.info('or reroute manually')134 # increment the automatic reroute counter & update the timestamp135 self._automatic_reroute_counter += 1136 self._last_automatic_reroute_timestamp = time.time()137 # trigger rerouting138 self._reroute()139 self.rerouting_triggered()140 def _reroute(self):141 # 1. say rerouting is in progress142 voiceMessage = "rerouting"143 voice = self.m.get('voice', None)144 if voice:145 voice.say(voiceMessage, "en") # make sure rerouting said with english voice146 time.sleep(2) # TODO: improve this147 route = self.m.get('route', None)148 if route:149 # 2. get a new route from current position to destination150 route.reroute()151 # 3. restart routing for to this new route from the closest point152 self.sendMessage("ms:turnByTurn:start:closest")153 @property154 def automatic_rerouting_enabled(self):155 """Report if automatic rerouting is enabled.156 Automatic rerouting is considered to be enabled if the reroutingThreshold key157 holds a value that is not None (in general a string representation of a floating158 point number).159 :return: if automatic rerouting is enabled160 :rtype: bool161 """162 return self.get('reroutingThreshold', REROUTING_DEFAULT_THRESHOLD)163 def _say_turn(self, message, distanceInMeters, force_language_code=False):164 """Say a text-to-speech message about a turn.165 This basically wraps the simple say method from voice and adds some more information,166 like current distance to the turn.167 """168 voice = self.m.get('voice', None)169 units = self.m.get('units', None)170 if voice and units:171 (dist_string, short, long) = units.humanRound(distanceInMeters)172 if dist_string == "0":173 dist_string = ""174 else:175 dist_string = '<p xml:lang="en">in <emphasis level="strong">' + dist_string + ' ' + long + '</emphasis></p><br>'176 # TODO: language specific distance strings177 text = dist_string + message178 # """ the message can contain unicode, this might cause an exception when printing it179 # in some systems (SHR-u on Neo, for example)"""180 # try:181 # print("saying: %s" % text)182 # pass183 # except UnicodeEncodeError:184 # print("voice: printing the current message to stdout failed do to unicode conversion error")185 if force_language_code:186 espeak_language_code = force_language_code187 else:188 # the espeak language code is the first part of this whitespace delimited string189 espeak_language_code = self.get('directionsLanguage', 'en en').split(" ")[0]190 return voice.say(text, espeak_language_code)191 @property192 def _max_step_index(self):193 return self._route.message_point_count - 1194 def _get_starting_step(self, which='first'):195 if self._route:196 if which == 'first':197 return self._get_step(0)198 if which == 'closest':199 return self._get_closest_step()200 def _get_closest_step(self):201 """Get the geographically closest step."""202 proj = self.m.get('projection', None) # we also need the projection module203 pos = self.get('pos', None) # and current position204 if pos and proj:205 (lat1, lon1) = pos206 temp_steps = self._route.message_points207 for step in temp_steps:208 (lat2, lon2) = step.getLL()209 step.current_distance = geo.distance(lat1, lon1, lat2, lon2) * 1000 # km to m210 closest_step = sorted(temp_steps, key=lambda x: x.current_distance)[0]211 return closest_step212 def _get_step(self, index):213 """Return steps for valid index, None otherwise."""214 max_index = self._max_step_index215 if index > max_index or index < -(max_index + 1):216 self.log.error("wrong turn index: %d, max index is: %d", index, max_index)217 return None218 else:219 return self._route.get_message_point_by_index(index)220 def _get_step_index(self, step):221 return self._route.get_message_point_index(step)222 @property223 def current_step(self):224 """Return current step."""225 return self._route.get_message_point_by_index(self._current_step_index)226 @current_step.setter227 def current_step(self, step):228 """Set a given step as current step."""229 mp_index = self._route.get_message_point_index(step)230 self._current_step_index = mp_index231 self.current_step_changed(step)232 @property233 def _current_step_index(self):234 return self._current_step_index_value235 @_current_step_index.setter236 def _current_step_index(self, index):237 self._current_step_index_value = index238 # trigger a navigation update every time current step index is set239 self._do_navigation_update()240 def switch_to_previous_step(self):241 """Switch to previous step and clean up."""242 next_index = self._current_step_index - 1243 if next_index >= 0:244 self._current_step_index = next_index245 self._espeak_first_trigger = False246 self._espeak_second_trigger = False247 self.log.info("switching to previous step")248 self.current_step_changed(self.current_step)249 else:250 self.log.info("previous step reached")251 def switch_to_next_step(self):252 """Switch to next step and clean up."""253 max_index = self._max_step_index254 next_index = self._current_step_index + 1255 if next_index <= max_index:256 self._current_step_index = next_index257 self._espeak_first_and_half_trigger = False258 self._espeak_first_trigger = False259 self._espeak_second_trigger = False260 self.log.info("switching to next step")261 self.current_step_changed(self.current_step)262 else:263 self.log.info("last step reached")264 self._last_step_reached()265 def _last_step_reached(self):266 """Handle all tasks that are needed once the last step is reached."""267 # disable automatic rerouting268 self._stop_tbt_worker()269 # automatic rerouting needs to be disabled to prevent rerouting270 # once the destination was reached271 self.destination_reached()272 def enabled(self):273 """Return if Turn by Turn navigation is enabled."""274 if self._route:275 return True276 else:277 return False278 def start_tbt(self, from_where='first'):279 """Start Turn-by-turn navigation."""280 # clean up any possible previous navigation data281 self._go_to_initial_state()282 # NOTE: turn and step are used interchangeably in the documentation283 m = self.m.get('route', None)284 if m:285 route = m.get_current_directions()286 if route: # is the route nonempty ?287 self._route = route288 # get route in radians for automatic rerouting289 self.radiansRoute = route.get_points_lle_radians(drop_elevation=True)290 # start rerouting watch291 self._start_tbt_worker()292 # for some reason the combined distance does not account for the last step293 self._m_route_length = route.length294 # some statistics295 meters_per_sec_speed = self.get('metersPerSecSpeed', None)296 dt = m.route_lookup_duration297 self.log.info("route lookup took: %f s" % dt)298 if dt and meters_per_sec_speed:299 dm = dt * meters_per_sec_speed300 self.log.info("distance traveled during lookup: %f m" % dm)301 # the duration of the road lookup and other variables are currently not used302 # in the heuristics but might be added later to make the heuristics more robust303 # now we decide if we use the closest turn, or the next one,304 # as we might be already past it and on our way to the next turn305 cs = self._get_closest_step() # get geographically closest step306 pos = self.get('pos', None) # get current position307 p_reached_dist = int(self.get('pointReachedDistance', 30)) # get the trigger distance308 next_turn_id = self._get_step_index(cs) + 1309 next_step = self._get_step(next_turn_id)310 # check if we have all the data needed for our heuristics311 self.log.info("trying to guess correct step to start navigation")312 if next_step and pos and p_reached_dist:313 (lat, lon) = pos314 (csLat, csLon) = cs.getLL()315 (nsLat, nsLon) = next_step.getLL()316 pos2next_step = geo.distance(lat, lon, nsLat, nsLon) * 1000317 pos2current_step = geo.distance(lat, lon, csLat, csLon) * 1000318 current_step2next_step = geo.distance(csLat, csLon, nsLat, nsLon) * 1000319 # self.log.debug("pos",(lat,lon))320 # self.log.debug("cs",(csLat,csLon))321 # self.log.debug("ns",(nsLat,nsLon))322 self.log.debug("position to next turn: %f m" % pos2next_step)323 self.log.debug("position to current turn: %f m" % pos2current_step)324 self.log.debug("current turn to next turn: %f m" % current_step2next_step)325 self.log.debug("turn reached trigger distance: %f m" % p_reached_dist)326 if pos2current_step > p_reached_dist:327 # this means we are out of the "capture circle" of the closest step328 # what is more distant, the closest or the next step ?329 if pos2next_step < current_step2next_step:330 # we are mostly probably already past the closest step,331 # so we switch to the next step at once332 self.log.debug("already past closest turn, switching to next turn")333 self.current_step = next_step334 # we play the message for the next step,335 # with current distance to this step,336 # to assure there is some voice output immediately after337 # getting a new route or rerouting"""338 plaintextMessage = next_step.ssml_message339 self._say_turn(plaintextMessage, pos2next_step)340 else:341 # we have probably not yet reached the closest step,342 # so we start navigation from it343 self.log.debug("closest turn not yet reached")344 self.current_step = cs345 else:346 # we are inside the "capture circle" of the closest step,347 # this means the navigation will trigger the voice message by itself348 # and correctly switch to next step349 # -> no need to switch to next step from here350 self.log.debug("inside reach distance of closest turn")351 self.current_step = cs352 else:353 # we dont have some of the data, that is needed to decide354 # if we start the navigation from the closest step of from the step that is after it355 # -> we just start from the closest step356 self.log.debug("not enough data to decide, using closest turn")357 self.current_step = cs358 self._do_navigation_update() # run a first time navigation update359 self._location_watch_id = self.watch('locationUpdated', self.location_update_cb)360 self.log.info("started and ready")361 # trigger the navigation-started signal362 self.navigation_started()363 def stop_tbt(self):364 """Stop Turn-by-Turn navigation."""365 # remove location watch366 if self._location_watch_id:367 self.removeWatch(self._location_watch_id)368 # cleanup369 self._go_to_initial_state()370 self._stop_tbt_worker()371 self.log.info("stopped")372 self.navigation_stopped()373 def location_update_cb(self, key, newValue, oldValue):374 """Position changed, do a tbt navigation update."""375 # TODO: use a Signal for this ?376 if key == "locationUpdated": # just to be sure377 self._do_navigation_update()378 else:379 self.log.error("invalid key: %r", key)380 def _do_navigation_update(self):381 """Do a navigation update."""382 # make sure there really are some steps383 if not self._route:384 self.log.error("no route")385 return386 pos = self.get('pos', None)387 if pos is None:388 self.log.error("skipping update, invalid position")389 return390 # get/compute/update necessary the values391 lat1, lon1 = pos392 lat2, lon2 = self.current_step.getLL()393 current_distance = geo.distance(lat1, lon1, lat2, lon2) * 1000 # km to m394 self._current_distance = current_distance # update current distance395 # use some sane minimum distance396 distance = int(self.get('minAnnounceDistance', 100))397 # GHK: make distance speed-sensitive398 #399 # I came up with this formula after a lot of experimentation with400 # gnuplot. The idea is to give the user some simple parameters to401 # adjust yet let him have a lot of control. There are five402 # parameters in the equation:403 #404 # low_speed Speed below which the pre-announcement time is constant.405 # low_time Announcement time at and below lowSpeed.406 # high_speed Speed above which the announcement time is constant.407 # high_time Announcement time at and above highSpeed.408 # power Exponential power used in the formula; good values are 0.5-5409 #410 # The speeds are in m/s. Obviously highXXX must be greater than lowXXX.411 # If power is 1.0, announcement times increase linearly above lowSpeed.412 # If power < 1.0, times rise rapidly just above lowSpeed and more413 # gradually approaching highSpeed. If power > 1.0, times rise414 # gradually at first and rapidly near highSpeed. I like power > 1.0.415 #416 # The reasoning is that at high speeds you are probably on a417 # motorway/freeway and will need extra time to get into the proper418 # lane to take your exit. That reasoning is pretty harmless on a419 # high-speed two-lane road, but it breaks down if you are stuck in420 # heavy traffic on a four-lane freeway (like in Los Angeles421 # where I live) because you might need quite a while to work your422 # way across the traffic even though you're creeping along. But I423 # don't know a good way to detect whether you're on a multi-lane road,424 # I chose speed as an acceptable proxy.425 #426 # Regardless of speed, we always warn a certain distance ahead (see427 # "distance" above). That distance comes from the value in the current428 # step of the directions.429 #430 # BTW, if you want to use gnuplot to play with the curves, try:431 # max(a,b) = a > b ? a : b432 # min(a,b) = a < b ? a : b433 # warn(x,t1,s1,t2,s2,p) = min(t2,(max(s1,x)-s1)**p*(t2-t1)/(s2-s1)**p+t1)434 # plot [0:160][0:] warn(x,10,50,60,100,2.0)435 #436 meters_per_sec_speed = self.get('metersPerSecSpeed', None)437 point_reached_distance = int(self.get('pointReachedDistance', 30))438 if meters_per_sec_speed:439 # check if we can miss the point by going too fast -> mps speed > point reached distance440 # also enlarge the rerouting threshold as it looks like it needs to be larger441 # when moving at high speed to prevent unnecessary rerouting442 if meters_per_sec_speed > point_reached_distance * 0.75:443 point_reached_distance = meters_per_sec_speed * 2444 # self.log.debug("tbt: enlarging point reached distance to: %1.2f m due to large speed (%1.2f m/s)". (pointReachedDistance, metersPerSecSpeed)445 if meters_per_sec_speed > INCREASE_REROUTING_THRESHOLD_SPEED:446 self._rerouting_threshold_multiplier = REROUTING_THRESHOLD_MULTIPLIER447 else:448 self._rerouting_threshold_multiplier = 1.0449 # speed & time based triggering450 low_speed = float(self.get('minAnnounceSpeed', 13.89))451 high_speed = float(self.get('maxAnnounceSpeed', 27.78))452 high_speed = max(high_speed, low_speed + 0.1)453 low_time = int(self.get('minAnnounceTime', 10))454 high_time = int(self.get('maxAnnounceTime', 60))455 high_time = max(high_time, low_time)456 power = float(self.get('announcePower', 2.0))457 warn_time = (max(low_speed, meters_per_sec_speed) - low_speed) ** power \458 * (high_time - low_time) / (high_speed - low_speed) ** power \459 + low_time460 warn_time = min(high_time, warn_time)461 distance = max(distance, warn_time * meters_per_sec_speed)462 if self.get('debugTbT', False):463 self.log.debug("#####")464 self.log.debug("min/max announce time: %d/%d s", low_time, high_time)465 self.log.debug("trigger distance: %1.2f m (%1.2f s warning)", distance, distance / float(meters_per_sec_speed))466 self.log.debug("current distance: %1.2f m", current_distance)467 self.log.debug("current speed: %1.2f m/s (%1.2f km/h)", meters_per_sec_speed, meters_per_sec_speed * 3.6)468 self.log.debug("point reached distance: %f m", point_reached_distance)469 self.log.debug("1. triggered=%r, 1.5. triggered=%r, 2. triggered=%r",470 self._espeak_first_trigger, self._espeak_first_and_half_trigger, self._espeak_second_trigger)471 if warn_time > 30:472 self.log.debug("optional (20 s) trigger distance: %1.2f", 20.0 * meters_per_sec_speed)473 if current_distance <= point_reached_distance:474 # this means we reached the point"""475 if self._espeak_second_trigger is False:476 self.log.debug("triggering espeak nr. 2")477 # say the message without distance478 plaintextMessage = self.current_step.ssml_message479 # consider turn said even if it was skipped (ignore errors)480 self._say_turn(plaintextMessage, 0)481 self.current_step.visited = True # mark this point as visited482 self._espeak_first_trigger = True # everything has been said, again :D483 self._espeak_second_trigger = True # everything has been said, again :D484 self.switch_to_next_step() # switch to next step485 else:486 if current_distance <= distance:487 # this means we reached an optimal distance for saying the message"""488 if self._espeak_first_trigger is False:489 self.log.debug("triggering espeak nr. 1")490 plaintextMessage = self.current_step.ssml_message491 if self._say_turn(plaintextMessage, current_distance):492 self._espeak_first_trigger = True # first message done493 if self._espeak_first_and_half_trigger is False and warn_time > 30:494 if current_distance <= (20.0 * meters_per_sec_speed):495 # in case that the warning time gets too big, add an intermediate warning at 20 seconds496 # NOTE: this means it is said after the first trigger497 plaintextMessage = self.current_step.ssml_message498 if self._say_turn(plaintextMessage, current_distance):499 self._espeak_first_and_half_trigger = True # intermediate message done500 ## automatic rerouting ##501 # is automatic rerouting enabled from options502 # enabled == threshold that is not not None503 if self.automatic_rerouting_enabled:504 # rerouting is enabled only once the route is reached for the first time505 if self._on_route and not self._route_reached:506 self._route_reached = True507 self._automatic_reroute_counter = 0508 self.log.info('route reached, rerouting enabled')509 # did the TBT worker detect that the rerouting threshold was reached ?510 if self._rerouting_conditions_met():511 # test if enough consecutive divergence point were recorded512 if self._rerouting_threshold_crossed_counter >= REROUTING_TRIGGER_COUNT:513 # reset the routeReached override514 self._override_route_reached = False515 # trigger rerouting516 self._reroute_auto()517 else:518 # reset the counter519 self._rerouting_threshold_crossed_counter = 0520 def _rerouting_conditions_met(self):521 return (self._route_reached or self._override_route_reached) and not self._on_route522 def _following_route(self):523 """Are we still following the route or is rerouting needed ?"""524 start1 = time.perf_counter()525 pos = self.get('pos', None)526 proj = self.m.get('projection', None)527 if pos and proj:528 pLat, pLon = pos529 # we use Radians to get rid of radian conversion overhead for530 # the geographic distance computation method531 radians_ll = self.radiansRoute532 pLat = geo.radians(pLat)533 pLon = geo.radians(pLon)534 if len(radians_ll) == 0:535 self.log.error("Divergence: can't follow a zero point route")536 return False537 elif len(radians_ll) == 1: # 1 point route538 aLat, aLon = radians_ll[0]539 min_distance = geo.distance_approx_radians(pLat, pLon, aLat, aLon)540 else: # 2+ points route541 aLat, aLon = radians_ll[0]542 bLat, bLon = radians_ll[1]543 min_distance = geo.distance_point_to_line_radians(pLat, pLon, aLat, aLon, bLat, bLon)544 aLat, aLon = bLat, bLon545 for point in radians_ll[1:]:546 bLat, bLon = point547 dist = geo.distance_point_to_line_radians(pLat, pLon, aLat, aLon, bLat, bLon)548 if dist < min_distance:549 min_distance = dist550 aLat, aLon = bLat, bLon551 # the multiplier tries to compensate for high speed movement552 threshold = float(553 self.get('reroutingThreshold', REROUTING_DEFAULT_THRESHOLD)) * self._rerouting_threshold_multiplier554 self.log.debug("Divergence from route: %1.2f/%1.2f m computed in %1.0f ms",555 min_distance * 1000, float(threshold), (1000 * (time.perf_counter() - start1)))556 return min_distance * 1000 < threshold557 def _start_tbt_worker(self):558 with self._tbt_worker_lock:559 # reuse previous thread or start new one560 if self._tbt_worker_enabled:561 self.log.info("reusing TBT worker thread")562 else:563 self.log.info("starting new TBT worker thread")564 t = threads.ModRanaThread(name=constants.THREAD_TBT_WORKER,565 target=self._tbt_worker)566 threads.threadMgr.add(t)567 self._tbt_worker_enabled = True568 def _stop_tbt_worker(self):569 with self._tbt_worker_lock:570 self.log.info("stopping the TBT worker thread")571 self._tbt_worker_enabled = False572 def _tbt_worker(self):573 """This function runs in its own thread and checks if we are still following the route."""574 self.log.info("TBTWorker: started")575 # The _tbt_worker_enabled variable is needed as once the end of the route is reached576 # there will be a route set but further rerouting should not be performed.577 while True:578 with self._tbt_worker_lock:579 # Either tbt has been shut down (no route is set)580 # or rerouting is no longer needed.581 if not self._route or not self._tbt_worker_enabled:582 self.log.info("TBTWorker: shutting down")583 break584 # first make sure automatic rerouting is enabled585 # eq. reroutingThreshold != None586 if self.automatic_rerouting_enabled:587 # check if we are still following the route588 # self.log.debug('TBTWorker: checking divergence from route')589 self._on_route = self._following_route()590 if self._rerouting_conditions_met():591 self.log.info('TBTWorker: divergence detected')592 # switch to quick updates593 for i in range(0, REROUTING_TRIGGER_COUNT + 1):594 time.sleep(1)595 onRoute = self._following_route()596 if onRoute: # divergence stopped597 self._on_route = onRoute598 self.log.info('TBTWorker: false alarm')599 break600 else: # still diverging from current route601 self._on_route = onRoute602 # increase divergence counter603 self._rerouting_threshold_crossed_counter += 1604 self.log.debug('TBTWorker: increasing divergence counter (%d)',605 self._rerouting_threshold_crossed_counter)606 time.sleep(REROUTE_CHECK_INTERVAL / 1000.0)607 def shutdown(self):608 # cleanup...

Full Screen

Full Screen

_page.py

Source:_page.py Github

copy

Full Screen

...193 ),194 )195 self._channel.on(196 "route",197 lambda params: self._on_route(198 from_channel(params["route"]), from_channel(params["request"])199 ),200 )201 self._channel.on(202 "video",203 lambda params: cast(Video, self.video)._set_relative_path(204 params["relativePath"]205 ),206 )207 self._channel.on(208 "webSocket",209 lambda params: self.emit(210 Page.Events.WebSocket, from_channel(params["webSocket"])211 ),212 )213 self._channel.on(214 "worker", lambda params: self._on_worker(from_channel(params["worker"]))215 )216 def _set_browser_context(self, context: "BrowserContext") -> None:217 self._browser_context = context218 self._timeout_settings = TimeoutSettings(context._timeout_settings)219 def _on_request_failed(220 self,221 request: Request,222 response_end_timing: float,223 failure_text: str = None,224 ) -> None:225 request._failure_text = failure_text226 if request._timing:227 request._timing["responseEnd"] = response_end_timing228 self.emit(Page.Events.RequestFailed, request)229 def _on_request_finished(230 self, request: Request, response_end_timing: float231 ) -> None:232 if request._timing:233 request._timing["responseEnd"] = response_end_timing234 self.emit(Page.Events.RequestFinished, request)235 def _on_frame_attached(self, frame: Frame) -> None:236 frame._page = self237 self._frames.append(frame)238 self.emit(Page.Events.FrameAttached, frame)239 def _on_frame_detached(self, frame: Frame) -> None:240 self._frames.remove(frame)241 frame._detached = True242 self.emit(Page.Events.FrameDetached, frame)243 def _on_route(self, route: Route, request: Request) -> None:244 for handler_entry in self._routes:245 if handler_entry.matcher.matches(request.url):246 result = cast(Any, handler_entry.handler)(route, request)247 if inspect.iscoroutine(result):248 asyncio.create_task(result)249 return250 self._browser_context._on_route(route, request)251 def _on_binding(self, binding_call: "BindingCall") -> None:252 func = self._bindings.get(binding_call._initializer["name"])253 if func:254 asyncio.create_task(binding_call.call(func))255 self._browser_context._on_binding(binding_call)256 def _on_worker(self, worker: "Worker") -> None:257 self._workers.append(worker)258 worker._page = self259 self.emit(Page.Events.Worker, worker)260 def _on_close(self) -> None:261 self._is_closed = True262 self._browser_context._pages.remove(self)263 self.emit(Page.Events.Close)264 def _on_crash(self) -> None:...

Full Screen

Full Screen

two_way_env.py

Source:two_way_env.py Github

copy

Full Screen

...75 #self.print_obs_space(ref_vehicle=self.vehicle)76 #self.print_obs_space(ref_vehicle=self.idmdp_vehicle)77 #self._set_curriculam(curriculam_reward_threshold=0.6*self.GOAL_REWARD)78 return (obs, rew, done, info)79 def _on_route(self, veh=None):80 if veh is None:81 veh = self.vehicle82 lane_ID = veh.lane_index[2]83 onroute = (lane_ID == 1)84 return onroute85 86 def _on_road(self, veh=None):87 if veh is None:88 veh = self.vehicle89 return (veh.position[0] < self.ROAD_LENGTH) and (veh.position[0] > 0)90 def _goal_achieved(self, veh=None):91 if veh is None:92 veh = self.vehicle93 return (veh.position[0] > self.config["GOAL_LENGTH"] and \94 self._on_route(veh))95 def _reward(self, action):96 """97 The vehicle is rewarded for driving with high velocity98 :param action: the action performed99 :return: the reward of the state-action transition100 """101 #neighbours = self.road.network.all_side_lanes(self.vehicle.lane_index)102 collision_reward = self.config["COLLISION_REWARD"] * self.vehicle.crashed103 velocity_reward = self.config["VELOCITY_REWARD"] * (self.vehicle.velocity_index -1) / (self.vehicle.SPEED_COUNT - 1)104 if (velocity_reward > 0):105 velocity_reward *= self._on_route()106 goal_reward = self.config["GOAL_REWARD"]107 if self.vehicle.crashed:108 reward = collision_reward + min(0.0, velocity_reward)109 elif self._goal_achieved():110 reward = goal_reward + velocity_reward111 else:112 reward = velocity_reward113 if not self.vehicle.action_validity:114 reward = reward + self.config["INVALID_ACTION_REWARD"]115 return reward116 def _is_terminal(self):117 """118 The episode is over if the ego vehicle crashed or the time is out.119 """...

Full Screen

Full Screen

multilane_env.py

Source:multilane_env.py Github

copy

Full Screen

...75 self.ego_x0 = None76 EnvViewer.SCREEN_HEIGHT = self.config['screen_height']77 EnvViewer.SCREEN_WIDTH = self.config['screen_width']78 self.reset()79 def _on_route(self, veh=None):80 return True81 82 def _on_road(self, veh=None):83 if veh is None:84 veh = self.vehicle85 return (veh.position[0] < self.ROAD_LENGTH) and (veh.position[0] > 0)86 def _goal_achieved(self, veh=None):87 if veh is None:88 veh = self.vehicle89 return (veh.position[0] > 0.99 * self.ROAD_LENGTH) and \90 self._on_route(veh)91 def reset(self):92 self.steps = 093 self._create_road()94 self._create_vehicles()95 return super(MultilaneEnv, self).reset()96 def step(self, action):97 self.steps += 198 self.previous_action = action99 obs, rew, done, info = super(MultilaneEnv, self).step(action)100 self.episode_travel = self.vehicle.position[0] - self.ego_x0101 return (obs, rew, done, info)102 def _create_road(self):103 """104 Create a road composed of straight adjacent lanes.105 """106 self.road = Road(network=RoadNetwork.straight_road_network(lanes=self.config["lanes_count"], length=self.ROAD_LENGTH),107 np_random=self.np_random)108 def _create_vehicles(self):109 """110 Create some new random vehicles of a given type, and add them on the road.111 """112 self.vehicle = MDPVehicle.create_random(road=self.road,113 velocity=np.random.randint(low=15,high=35),114 spacing=self.config["initial_spacing"],115 config=self.config)116 self.vehicle.is_ego_vehicle = True117 self.road.vehicles.append(self.vehicle)118 self.ego_x0 = self.vehicle.position[0]119 vehicles_type = utils.class_from_path(self.config["other_vehicles_type"])120 ahead_vehicles = self.config["vehicles_count"] // 2121 behind_vehicles = self.config["vehicles_count"] - ahead_vehicles122 for _ in range(ahead_vehicles):123 self.road.vehicles.append(vehicles_type.create_random(road=self.road,124 ahead=True,125 config=self.config)126 )127 for _ in range(behind_vehicles):128 self.road.vehicles.append(vehicles_type.create_random(road=self.road,129 ahead=False,130 config=self.config)131 )132 for _ in range(10):133 self.road.vehicles.append(Obstacle.create_random(road=self.road,134 ahead=False,135 config=self.config136 )137 )138 # Add the virtual obstacles139 lane = self.road.network.lanes_list()[0]140 x0 = lane.length/2141 position = lane.position(x0, -3.5)142 lane_index = self.road.network.get_closest_lane_index(143 position=position,144 heading=0 145 ) 146 virtual_obstacle_left = Obstacle(self.road,147 position=position,148 heading=lane.heading_at(x0),149 velocity=0,150 target_velocity=0,151 lane_index=lane_index,152 target_lane_index=lane_index, 153 enable_lane_change=False,154 config=self.config)155 virtual_obstacle_left.LENGTH = lane.length156 virtual_obstacle_left.virtual = True157 self.road.vehicles.append(virtual_obstacle_left)158 self.road.virtual_vehicles.append(virtual_obstacle_left)159 lane = self.road.network.lanes_list()[-1]160 x0 = lane.length/2161 position = lane.position(x0, 3.5)162 virtual_obstacle_right = Obstacle(self.road,163 position=position,164 heading=lane.heading_at(x0),165 velocity=0,166 target_velocity=0,167 lane_index=lane_index,168 target_lane_index=lane_index, 169 enable_lane_change=False,170 config=self.config)171 virtual_obstacle_right.LENGTH = lane.length172 virtual_obstacle_right.virtual = True173 self.road.vehicles.append(virtual_obstacle_right)174 self.road.virtual_vehicles.append(virtual_obstacle_right)175 def _reward(self, action):176 """177 The reward is defined to foster driving at high speed, on the rightmost lanes, and to avoid collisions.178 :param action: the last action performed179 :return: the corresponding reward180 """181 action_lookup = dict(map(reversed, AbstractEnv.ACTIONS.items()))182 action_reward = {action_lookup['LANE_LEFT']: self.LANE_CHANGE_REWARD, 183 action_lookup['IDLE']: 0, 184 action_lookup['LANE_RIGHT']: self.LANE_CHANGE_REWARD, 185 action_lookup['FASTER']: 0, 186 action_lookup['SLOWER']: 0,187 action_lookup['LANE_LEFT_AGGRESSIVE']: self.AGGRESSIVE_LANE_CHANGE_REWARD,188 action_lookup['LANE_RIGHT_AGGRESSIVE']: self.AGGRESSIVE_LANE_CHANGE_REWARD189 }190 neighbours = self.road.network.all_side_lanes(self.vehicle.lane_index)191 collision_reward = self.config["COLLISION_REWARD"] * self.vehicle.crashed192 velocity_reward = self.config["VELOCITY_REWARD"] * (self.vehicle.velocity_index -1) / (self.vehicle.SPEED_COUNT - 1)193 if (velocity_reward > 0):194 velocity_reward *= self._on_route()195 goal_reward = self.config["GOAL_REWARD"]196 if self.vehicle.crashed:197 reward = collision_reward + min(0.0, velocity_reward + action_reward[action])198 elif self._goal_achieved():199 reward = goal_reward + velocity_reward + action_reward[action]200 else:201 reward = velocity_reward + action_reward[action]202 return reward203 def _is_terminal(self):204 """205 The episode is over if the ego vehicle crashed or the time is out.206 """207 return self.vehicle.crashed or \208 self._goal_achieved() or \...

Full Screen

Full Screen

Playwright tutorial

LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.

Chapters:

  1. What is Playwright : Playwright is comparatively new but has gained good popularity. Get to know some history of the Playwright with some interesting facts connected with it.
  2. How To Install Playwright : Learn in detail about what basic configuration and dependencies are required for installing Playwright and run a test. Get a step-by-step direction for installing the Playwright automation framework.
  3. Playwright Futuristic Features: Launched in 2020, Playwright gained huge popularity quickly because of some obliging features such as Playwright Test Generator and Inspector, Playwright Reporter, Playwright auto-waiting mechanism and etc. Read up on those features to master Playwright testing.
  4. What is Component Testing: Component testing in Playwright is a unique feature that allows a tester to test a single component of a web application without integrating them with other elements. Learn how to perform Component testing on the Playwright automation framework.
  5. Inputs And Buttons In Playwright: Every website has Input boxes and buttons; learn about testing inputs and buttons with different scenarios and examples.
  6. Functions and Selectors in Playwright: Learn how to launch the Chromium browser with Playwright. Also, gain a better understanding of some important functions like “BrowserContext,” which allows you to run multiple browser sessions, and “newPage” which interacts with a page.
  7. Handling Alerts and Dropdowns in Playwright : Playwright interact with different types of alerts and pop-ups, such as simple, confirmation, and prompt, and different types of dropdowns, such as single selector and multi-selector get your hands-on with handling alerts and dropdown in Playright testing.
  8. Playwright vs Puppeteer: Get to know about the difference between two testing frameworks and how they are different than one another, which browsers they support, and what features they provide.
  9. Run Playwright Tests on LambdaTest: Playwright testing with LambdaTest leverages test performance to the utmost. You can run multiple Playwright tests in Parallel with the LammbdaTest test cloud. Get a step-by-step guide to run your Playwright test on the LambdaTest platform.
  10. Playwright Python Tutorial: Playwright automation framework support all major languages such as Python, JavaScript, TypeScript, .NET and etc. However, there are various advantages to Python end-to-end testing with Playwright because of its versatile utility. Get the hang of Playwright python testing with this chapter.
  11. Playwright End To End Testing Tutorial: Get your hands on with Playwright end-to-end testing and learn to use some exciting features such as TraceViewer, Debugging, Networking, Component testing, Visual testing, and many more.
  12. Playwright Video Tutorial: Watch the video tutorials on Playwright testing from experts and get a consecutive in-depth explanation of Playwright automation testing.

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful