Best Python code snippet using autotest_python
base.py
Source:base.py  
1#pylint: disable=W0311, W02122# плоÑ
о ÐºÐ½Ñ Ð·Ð°Ð´Ð¾ÐºÑменÑиÑовал, Ð½Ñ Ð´Ð° ладно3"""4A general script for contacting with this library5"""6from asyncio import Queue, sleep7from typing import Any, Dict, Optional, List, AsyncGenerator8from .abstract import AbstractBackend, Results, SubscriptionUpdater9class Subscription:10	"""This class allows you to iterate through messages"""11	def __init__(self, channel: str, updater: SubscriptionUpdater):12		assert isinstance(channel, str), "channel's datatype is str"13		assert isinstance(updater, SubscriptionUpdater), "caster's datatype is SnowieCaster"14		self.channel = channel15		self.updater = updater16		self._queue = Queue()17	async def __aiter__(self) -> Optional[AsyncGenerator]:18		while True:19			assert isinstance(self.channel, str), "channel's datatype is str"20			while isinstance(self._queue, Queue) and self._queue.empty():21				await self.updater._update_subscriptions(self.channel)22				request_delay = self.updater.get_request_delay()23				if request_delay is None:24					break25				await sleep(request_delay)26			if not isinstance(self._queue, Queue):27				break28			yield await self._queue.get()29	async def __aenter__(self):30		return self31	async def __aexit__(self, *args, **kwargs):32		self._queue = None33class SnowieCaster(SubscriptionUpdater):34	"""A general class for contacting with this library"""35	def _check_instance_vars(self):36		"""This method checks is class vars are valid"""37		assert isinstance(self._channel_subs, dict)38		assert isinstance(self._backend, AbstractBackend)39		assert hasattr(self._backend, "_request_delay_limit"), \40		"Haven't _request_delay_limit attribute in backend class"41		assert self._backend._request_delay_limit is None or \42		isinstance(self._backend._request_delay_limit, int), \43		"Bad _request_delay_limit attribute datatype in backend class"44	def get_request_delay(self):45		return self._request_delay46	async def _update_subscriptions(self, channel: str):47		"""This method inherits from SubscriptionUpdater and updates Subscription's class Queries"""48		self._check_instance_vars()49		if not self._is_started:50			return False51		assert isinstance(channel, str), "channel's datatype is str"52		if channel not in self._channel_subs:53			return False54		subs: List[Subscription] = self._channel_subs[channel]55		assert isinstance(subs, list)56		result = Results.NONE_DATA57		async def get_result():58			nonlocal result59			if result is Results.NONE_DATA:60				result = await self._backend._aget_all(channel)61			return result62		for sub in subs:63			assert isinstance(sub, Subscription), "sub's datatype isn't Subscription"64			if not isinstance(sub._queue, Queue):65				# here is mb bug66				sub._queue = None67				subs.remove(sub)68				continue69			data = await get_result()70			if len(data) == 0:71				return False72			for data_result in data:73				await sub._queue.put(data_result)74		return True75	def __init__(self, backend: AbstractBackend, *args, requests_delay = None,76				 auto_start: Optional[bool] = False, **kwargs):77		"""Class init method"""78		assert isinstance(auto_start, bool), "auto_start's datatype is Optional[bool]"79		self._backend = backend80		self._request_delay = requests_delay81		self._channel_subs: Dict[str, List[Subscription]] = {}82		self._is_started = False83		self._check_instance_vars()84		if backend._request_delay_limit is None:85			self._request_delay = None86		if auto_start:87			self.start(*args, **kwargs)88	def __del__(self):89		"""try to stop backend correctly"""90		try:91			self.stop()92		except NotImplementedError:93			pass94	def start(self, *args, **kwargs) -> None:95		"""try to start backend synchronously"""96		if self._is_started:97			return False98		result = self._backend._start(*args, **kwargs)99		self._is_started = True100		return result101	def stop(self) -> None:102		"""try to stop backend synchronously"""103		if not self._is_started:104			return False105		result = self._backend._stop()106		self._is_started = False107		return result108	async def asubscribe(self, channel: str) -> None:109		"""Subscribe to channel updates asynchronously"""110		self._check_instance_vars()111		assert isinstance(channel, str), "channel's datatype is str"112		assert self._is_started, "Backend is not started, SnowieCaster.start()"113		sub = Subscription(channel, self)114		if channel not in self._channel_subs:115			self._channel_subs[channel]: List[Subscription] = []116		self._channel_subs[channel].append(sub)117		return sub118	async def astart(self, *args, **kwargs) -> None:119		"""Start backend asynchronously"""120		if self._is_started:121			return False122		result = await self._backend._astart(*args, **kwargs)123		self._is_started = True124		return result125	async def astop(self, *args, **kwargs) -> None:126		"""Stop backend asynchronously"""127		if not self._is_started:128			return False129		result = await self._backend._astart(*args, **kwargs)130		self._is_started = False131		return result132	async def apublish(self, channel: str, message: Any, *args, testing=False, **kwargs) -> None:133		"""Publish message to channel asynchronously"""134		self._check_instance_vars()135		assert not isinstance(message, Results), "message can't be SnowieCaster.abstract.Results \136		datatype (don't try import it)"137		assert isinstance(channel, str), "channel's datatype is str"138		assert self._is_started, "Backend is not started, SnowieCaster.start()"139		result = await self._backend._apublish(channel, message, *args, **kwargs)140		if not testing:141			await self._update_subscriptions(channel)...goto_method
Source:goto_method  
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3#4# Copy right 2020 Takuya TWUGO all right reseved5#6# This node enable you to control robot moving velocity from cloud using AWS IoT.7#8#9import rospy10import json11from geometry_msgs.msg import Twist12from std_msgs.msg import String13from nav_msgs.msg import Odometry14import time15import tf16from geometry_msgs.msg import Vector317import math18LINERAR_MAX_SPEED = 0.2219ANGULER_MAX_SPEED = 2.8420class TwugoMethod():21    def __init__(self):22        self._cmd_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=1)23        self._odom_sub = rospy.Subscriber(24            "/odom", Odometry, self.odom_cb, queue_size=1)25        self._twugo_method_sub = rospy.Subscriber(26            "/twugo_method", String, self.set_goal, queue_size=1)27        self._restart_method_sub = rospy.Subscriber(28            "/twugo_method/restart", String, self.restart_cb, queue_size=1)29        self._awsiot_to_ros_pub = rospy.Publisher(30            "/awsiot_to_ros", String, queue_size=1)31        self._twist = Twist()32        self._next_synctime = time.time()33        self._is_stoped = False34        self._is_started = False35        self._is_before_destination = False  # ç®æ¨å°ç¹ã«å°éããåã«åæ¢ããªãããã«ããããã®ãã©ã°36        self._turn_p = 0.9537        self._x_goal = 038        self._y_goal = 039    def main(self):40        while not rospy.is_shutdown():41            d = self._next_synctime - time.time()42            if d <= 0:43                continue44            time.sleep(d)45    def set_goal(self, data):46        self._is_started = True47        payload = json.loads(data.data)48        if payload["is_goal"]:49            self._is_started = False50            return51        elif self._is_before_destination:52            self._is_started = False53        54        self._is_before_destination = payload["is_destination"]55        self._x_goal = payload["x"]56        self._y_goal = payload["y"]57        if not self._is_started:58            return59        # GOTO60        request_id =  time.time()61        payload = {}62        payload["command"] = "navigation"63        payload["action"] = "setGoal"64        payload["request_id"] = request_id65        payload["x"] = self._x_goal66        payload["y"] = self._y_goal67        payload["yaw"] = 068        self._awsiot_to_ros_pub.publish(json.dumps({"payload":payload}))69    def restart_cb(self, data):70        """71        ãã§ãã¯ãã¤ã³ãã§åæ¢ãã¦ãããããããåã¹ã¿ã¼ããããããã®ãã³ãã©72        以ä¸ãåã¹ã¿ã¼ããããéã®ã³ãã³ãã®ä¾73        $ rostopic pub --once -v /twugo_method/restart std_msgs/String '{data: "hoge"}'74        """75        self._is_started = True76        # GOTO77        request_id =  time.time()78        payload = {}79        payload["command"] = "navigation"80        payload["action"] = "setGoal"81        payload["request_id"] = request_id82        payload["x"] = self._x_goal83        payload["y"] = self._y_goal84        payload["yaw"] = 085        self._awsiot_to_ros_pub.publish(json.dumps({"payload":payload}))86        rospy.loginfo("[goto_method]: restarted (%f, %f)" % (self._x_goal, self._y_goal))87    def odom_cb(self, data):88        self._next_synctime = time.time() + 0.389        if not self._is_started and not self._is_stoped:90            self._is_stoped = True91        if not self._is_started:92            return93        self._is_stoped = False94    @staticmethod95    def sign(x):96        return (x > 0) - (x < 0)97def main():98    rospy.init_node('twugo_method')99    remote_controller = TwugoMethod()100    remote_controller.main()101if __name__ == '__main__':...screenshot.py
Source:screenshot.py  
1# Copyright 2013 The Chromium Authors. All rights reserved.2# Use of this source code is governed by a BSD-style license that can be3# found in the LICENSE file.4import os5import signal6import tempfile7from pylib import cmd_helper8# TODO(jbudorick) Remove once telemetry gets switched over.9import pylib.android_commands10import pylib.device.device_utils11class VideoRecorder(object):12  """Records a screen capture video from an Android Device (KitKat or newer).13  Args:14    device: DeviceUtils instance.15    host_file: Path to the video file to store on the host.16    megabits_per_second: Video bitrate in megabits per second. Allowed range17                         from 0.1 to 100 mbps.18    size: Video frame size tuple (width, height) or None to use the device19          default.20    rotate: If True, the video will be rotated 90 degrees.21  """22  def __init__(self, device, megabits_per_second=4, size=None,23               rotate=False):24    # TODO(jbudorick) Remove once telemetry gets switched over.25    if isinstance(device, pylib.android_commands.AndroidCommands):26      device = pylib.device.device_utils.DeviceUtils(device)27    self._device = device28    self._device_file = (29        '%s/screen-recording.mp4' % device.GetExternalStoragePath())30    self._recorder = None31    self._recorder_stdout = None32    self._is_started = False33    self._args = ['adb']34    if str(self._device):35      self._args += ['-s', str(self._device)]36    self._args += ['shell', 'screenrecord', '--verbose']37    self._args += ['--bit-rate', str(megabits_per_second * 1000 * 1000)]38    if size:39      self._args += ['--size', '%dx%d' % size]40    if rotate:41      self._args += ['--rotate']42    self._args += [self._device_file]43  def Start(self):44    """Start recording video."""45    self._recorder_stdout = tempfile.mkstemp()[1]46    self._recorder = cmd_helper.Popen(47        self._args, stdout=open(self._recorder_stdout, 'w'))48    if not self._device.GetPids('screenrecord'):49      raise RuntimeError('Recording failed. Is your device running Android '50                         'KitKat or later?')51  def IsStarted(self):52    if not self._is_started:53      for line in open(self._recorder_stdout):54        self._is_started = line.startswith('Content area is ')55        if self._is_started:56          break57    return self._is_started58  def Stop(self):59    """Stop recording video."""60    os.remove(self._recorder_stdout)61    self._is_started = False62    if not self._recorder:63      return64    self._device.KillAll('screenrecord', signum=signal.SIGINT)65    self._recorder.wait()66  def Pull(self, host_file=None):67    """Pull resulting video file from the device.68    Args:69      host_file: Path to the video file to store on the host.70    Returns:71      Output video file name on the host.72    """73    host_file_name = host_file or ('screen-recording-%s.mp4' %74                                   self._device.old_interface.GetTimestamp())75    host_file_name = os.path.abspath(host_file_name)76    self._device.old_interface.EnsureHostDirectory(host_file_name)77    self._device.PullFile(self._device_file, host_file_name)78    self._device.RunShellCommand('rm -f "%s"' % self._device_file)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
