How to use get_all_controllers method in autotest

Best Python code snippet using autotest_python

tasks.py

Source:tasks.py Github

copy

Full Screen

1from __future__ import absolute_import, unicode_literals2import copy3import json4import requests5from celery import task6from django.conf import settings7from django.core.mail import send_mail8from django.http import HttpResponse9from .models import Setting10TOKEN = settings.SMART_HOME_ACCESS_TOKEN 11header = {'Authorization': 'Bearer {}'.format(settings.SMART_HOME_ACCESS_TOKEN)}12GET_ALL_CONTROLLERS = settings.SMART_HOME_API_URL13class Processor(object):14 """ Класс логики работы с умным домом """15 # Processor является синглтоном16 obj = None17 def __new__(cls, *args, **kwargs):18 if not cls.obj:19 cls.obj = super().__new__(cls)20 return cls.obj21 storage = {22 'hot_water_target_temperature': 80,23 'bedroom_target_temperature': 2124 } # хранилище данных на сервере25 @classmethod26 def _read_all_controllers(cls):27 """ Функция опроса всех контроллеров """28 try: # отправка запроса на сервер29 r = requests.get(GET_ALL_CONTROLLERS, headers=header)30 print(f'Отправлен запрос по адресу {GET_ALL_CONTROLLERS}')31 for position in r.json()['data']: # парсинг ответа от сервера32 if position['name'] not in cls.storage:33 cls.storage[position['name']] = None34 cls.storage[position['name']] = position['value'] # любые данные записываем в хранилище35 try: # берем значения из БД, которые заполняются через форму36 cls.storage['hot_water_target_temperature'] = Setting.objects.get(37 controller_name='hot_water_target_temperature').value38 cls.storage['bedroom_target_temperature'] = Setting.objects.get(39 controller_name='bedroom_target_temperature').value40 except Setting.DoesNotExist:41 pass42 except json.decoder.JSONDecodeError: # проверка того что данные прошли действительно валидные 43 print(f'Запрос не дошёл до адреса')44 return HttpResponse(content='Bad Gateway', status=502)45 return cls.storage46 @classmethod47 def control_bedroom_light(cls, val): # включаем свет если датчик дыма отключен(см. views.py)48 if not cls.storage['smoke_detector']:49 return val50 else:51 return False52 @classmethod53 def control_bathroom_light(cls, val): # включаем свет если датчик дыма отключен(см. views.py)54 if not cls.storage['smoke_detector']:55 return val56 else:57 return False58 @classmethod59 def control_bedroom_target_temperature(cls, val): # пишем в БД требуемую температуру(см. views.py)60 try:61 s = Setting.objects.filter(controller_name='bedroom_target_temperature')62 if val != s[0].value:63 s.update(value=val)64 print('Новые данные по температуре в комнате')65 except Setting.DoesNotExist:66 Setting.objects.create(controller_name='bedroom_target_temperature', label='Желаемая температура в спальне',67 value=val)68 return val69 @classmethod70 def control_hot_water_target_temperature(cls, val): # пишем в БД требуемую температуру(см. views.py)71 try:72 s = Setting.objects.filter(controller_name='hot_water_target_temperature')73 if val != s[0].value:74 s.update(value=val)75 print('Новые данные по температуре горячей воды')76 except Setting.DoesNotExist:77 Setting.objects.create(controller_name='hot_water_target_temperature',78 label='Желаемая температура горячей воды', value=val)79 return val80 @classmethod81 def check_signalization(cls): # проверяем все датчики на наличие аварийных ситуаций82 # логика работы при температуре ниже требуемой83 if cls.storage['bedroom_temperature'] > 1.1 * cls.storage['bedroom_target_temperature']:84 cls.storage['air_conditioner'] = True85 if cls.storage['bedroom_temperature'] < 0.9 * cls.storage['bedroom_target_temperature']:86 cls.storage['air_conditioner'] = False87 # логика работы при температуре ниже требуемой88 try:89 if cls.storage['boiler_temperature'] < 0.9 * cls.storage['hot_water_target_temperature']:90 cls.storage['boiler'] = True91 if cls.storage['boiler_temperature'] >= 1.1 * cls.storage['hot_water_target_temperature']:92 cls.storage['boiler'] = False93 except TypeError:94 pass95 if cls.storage['leak_detector']: # логика работы при протечке96 send_mail(subject='Авария дома', message='В доме случилась протечка воды', from_email='from@example.com',97 recipient_list=[settings.EMAIL_RECEPIENT, ])98 cls.storage['cold_water'] = False99 cls.storage['hot_water'] = False100 if not cls.storage['cold_water']: # логика работы при отключенной холодной воде101 cls.storage['boiler'] = False102 cls.storage['washing_machine'] = 'off'103 if cls.storage['smoke_detector']: # логика работы при включенном датчике задымления104 cls.storage['air_conditioner'] = False105 cls.storage['bedroom_light'] = False106 cls.storage['bathroom_light'] = False107 cls.storage['boiler'] = False108 cls.storage['washing_machine'] = 'off'109 if cls.storage['outdoor_light'] < 50 and cls.storage['bedroom_light'] is False and cls.storage['curtains'] != 'slightly_open':110 cls.storage['curtains'] = 'open'111 elif cls.storage['outdoor_light'] > 50 and cls.storage['curtains'] != 'slightly_open':112 cls.storage['curtains'] = 'close'113 @classmethod114 def _write_all_controllers(cls, some_data): # метод для записи данных в контроллеры115 data_in_controllers = []116 for key in some_data.keys():117 data_in_controllers.append({'name': key, 'value': some_data[key]})118 data_to_send = json.dumps({'controllers': data_in_controllers})119 requests.post(GET_ALL_CONTROLLERS, data=data_to_send, headers=header)120 print('Запрос отправлен на сервер')121@task()122def smart_home_manager():123 d1 = copy.deepcopy(Processor._read_all_controllers()) # получаю данные из запроса к API124 print('Данные полученные от сервера:')125 print(d1)126 Processor.check_signalization()127 d2 = copy.deepcopy(Processor.storage)128 print('Данные после обработки:')129 print(d2)130 for key in d1:131 if d1[key] == d2[key]:132 d2.pop(key, None)133 if len(d2) > 0:134 Processor._write_all_controllers(d2)135 print('Данные отправлены на сервер')136 print('Данные для отправки:')137 print(d2)138 else:...

Full Screen

Full Screen

control_arbiter.py

Source:control_arbiter.py Github

copy

Full Screen

1#!/usr/bin/env python2import rospy3import roslib4import threading5from controller.msg import ControlDynamixelContinuousAngleConfig6from dynamixel_servo.msg import DynamixelContinuousAngleConfig7from controller.msg import ControlDynamixelFullConfig8from dynamixel_servo.msg import DynamixelFullConfig9from controller.msg import ControlDynamixelJointConfig10from dynamixel_servo.msg import DynamixelJointConfig11from controller.msg import ControlDynamixelWheelConfig12from dynamixel_servo.msg import DynamixelWheelConfig13from controller.msg import ControlThrustConfig14from std_msgs.msg import Float64, Bool, String15from controller.srv import register_controller, register_controllerResponse16from controller.srv import request_controller, request_controllerResponse17from controller.srv import FloatMode18from controller.srv import get_all_controllers, get_all_controllersResponse19from kill_handling.listener import KillListener20from kill_handling.broadcaster import KillBroadcaster21PORT = ControlThrustConfig.PORT22STARBOARD = ControlThrustConfig.STARBOARD23ZERO_PWM = 1.5e-324# TODO25# Add service to get all avalable controllers26# Find way that servos and thruster call backs can all fire simltaniously but still27# are thread locked with request_controller28#Notes29# Switching controllers happens in a different thread than the thrusters getting published30# It is possible that after a switch occurs and the zero thrusters method has been called31# the other thread will still call its last cb so thread lock32class control_arbiter:33 def __init__(self):34 # Initilize ros35 rospy.init_node('control_arbiter')36 # Vars37 self.controller = 'none'38 self.controllers = []39 self.floating = False40 self.killed = False41 self.continuous_angle_lock = threading.Lock()42 self.joint_lock = threading.Lock()43 self.full_lock = threading.Lock()44 self.wheel_lock = threading.Lock()45 self.thrust_lock = threading.Lock()46 # Services47 self.request_controller_srv = rospy.Service('request_controller', request_controller,48 self.request_controller_cb)49 self.register_controller_srv = rospy.Service('register_controller', register_controller,50 self.register_controller_cb)51 self.float_srv = rospy.Service('float_mode', FloatMode, 52 self.set_float_mode)53 self.get_all_controllers_srv = rospy.Service('get_all_controllers', get_all_controllers,54 self.get_all_controllers_cb)55 # Publishers56 self.continuous_angle_pub = rospy.Publisher('/dynamixel/dynamixel_continuous_angle_config', 57 DynamixelContinuousAngleConfig, queue_size = 10)58 self.full_pub = rospy.Publisher('/dynamixel/dynamixel_full_config', 59 DynamixelFullConfig, queue_size = 10)60 self.joint_pub = rospy.Publisher('/dynamixel/dynamixel_joint_config', 61 DynamixelJointConfig, queue_size = 10)62 self.wheel_pub = rospy.Publisher('/dynamixel/dynamixel_wheel_config', 63 DynamixelWheelConfig, queue_size = 10)64 self.port_pub = rospy.Publisher('/stm32f3discovery_imu_driver/pwm1', 65 Float64, queue_size = 10)66 self.starboard_pub = rospy.Publisher('/stm32f3discovery_imu_driver/pwm2', 67 Float64, queue_size = 10)68 self.float_status_pub = rospy.Publisher('float_status', Bool, queue_size=1)69 self.current_controller_pub = rospy.Publisher('current_controller', String, queue_size=1)70 # Subscribers71 self.continuous_angle_sub = rospy.Subscriber('dynamixel_continuous_angle_config', 72 ControlDynamixelContinuousAngleConfig, self.continuous_angle_cb, queue_size = 10)73 self.full_sub = rospy.Subscriber('dynamixel_full_config', 74 ControlDynamixelFullConfig, self.full_cb, queue_size = 10)75 self.joint_sub = rospy.Subscriber('dynamixel_joint_config', 76 ControlDynamixelJointConfig, self.joint_cb, queue_size = 10)77 self.wheel_sub = rospy.Subscriber('dynamixel_wheel_config', 78 ControlDynamixelWheelConfig, self.wheel_cb, queue_size = 10)79 self.thruster_sub = rospy.Subscriber('thruster_config', 80 ControlThrustConfig, self.thruster_cb, queue_size = 10)81 # Timers82 self.update_timer = rospy.Timer(rospy.Duration(0.1), self.status_update)83 # Kill84 self.kill_listener = KillListener(self.set_kill_cb, self.clear_kill_cb)85 self.kill_broadcaster = KillBroadcaster(id = 'control_arbiter', description = 'control_arbiter has shutdown')86 try:87 self.kill_broadcaster.clear()88 except rospy.service.ServiceException, e:89 rospy.logwarn(str(e))90 rospy.on_shutdown(self.shutdown)91 # Utility functions92 # Zero thrusters93 def _zero_thrusters(self):94 self.port_pub.publish(Float64(ZERO_PWM))95 self.starboard_pub.publish(Float64(ZERO_PWM))96 def isValidController(self, controller):97 if controller == 'always_on':98 return True99 if controller == self.controller:100 return True101 return False102 # Servo callbacks103 def continuous_angle_cb(self, msg):104 with self.continuous_angle_lock:105 if not self.isValidController(msg.controller):106 return107 self.continuous_angle_pub.publish(msg.config)108 def full_cb(self, msg):109 with self.full_lock:110 if not self.isValidController(msg.controller):111 return112 self.full_pub.publish(msg.config)113 def joint_cb(self, msg):114 with self.joint_lock:115 if not self.isValidController(msg.controller):116 return117 self.joint_pub.publish(msg.config)118 def wheel_cb(self, msg):119 with self.wheel_lock:120 if not self.isValidController(msg.controller):121 return122 self.wheel_pub.publish(msg.config)123 # Thrust callback124 def thruster_cb(self, msg):125 with self.thrust_lock:126 if not self.isValidController(msg.controller):127 return128 if self.floating or self.killed:129 self._zero_thrusters()130 return131 if msg.id == PORT:132 self.port_pub.publish(Float64(msg.pulse_width))133 elif msg.id == STARBOARD:134 self.starboard_pub.publish(Float64(msg.pulse_width))135 else:136 rospy.logerr(str(msg.id) + ' is not a vaild thruster id, valid ids are 2 (starboard) or 3 (port)')137 # Service callbacks138 def register_controller_cb(self, request):139 response = register_controllerResponse()140 if request.controller not in self.controllers:141 response.success = True142 self.controllers.append(request.controller)143 else:144 response.success = False145 response.failure_description = '%s is already a registered controller' % request.controller146 return response147 def request_controller_cb(self, request):148 response = request_controllerResponse()149 if request.controller in self.controllers:150 response.success = True151 response.current_controller = request.controller152 153 self.continuous_angle_lock.acquire()154 self.joint_lock.acquire()155 self.full_lock.acquire()156 self.wheel_lock.acquire()157 self.thrust_lock.acquire()158 self.controller = request.controller159 self._zero_thrusters()160 161 self.continuous_angle_lock.release()162 self.joint_lock.release()163 self.full_lock.release()164 self.wheel_lock.release()165 self.thrust_lock.release()166 else:167 response.success = False168 response.current_controller = self.controller169 response.failure_description = request.controller + ' is not a registered controller. Registered controllers are ' + str(self.controllers)170 return response171 def get_all_controllers_cb(self, request):172 response = get_all_controllersResponse()173 response.controllers = self.controllers174 return response175 # Float functions176 # Used to place boat in floating mode (Thrusters are turned off)177 def set_float_mode(self, mode):178 self.floating = mode.float179 if self.floating:180 self._zero_thrusters()181 rospy.loginfo('PropaGator float-mode is ' + str(mode.float))182 return self.floating183 # Status update function184 def status_update(self, event):185 self.float_status_pub.publish(self.floating)186 self.current_controller_pub.publish(self.controller)187 # kill funcitons188 def shutdown(self):189 self.kill_broadcaster.send(True)190 def set_kill_cb(self):191 self.killed = True192 self._zero_thrusters()193 rospy.logwarn('control_arbiter killed because: %s' % self.kill_listener.get_kills())194 def clear_kill_cb(self):195 self.killed = False196 rospy.loginfo('control_arbiter unkilled')197if __name__ == '__main__':198 node = control_arbiter()...

Full Screen

Full Screen

controllers.py

Source:controllers.py Github

copy

Full Screen

...13# License for the specific language governing permissions and limitations14# under the License.15from zadarapy.validators import verify_start_limit, verify_vc_index, \16 verify_capacity, get_parameters_options, verify_interval17def get_all_controllers(session, start=None, limit=None, return_type=None,18 **kwargs):19 """20 Retrieves details for all virtual controllers for the VPSAOS.21 :type session: zadarapy.session.Session22 :param session: A valid zadarapy.session.Session object. Required.23 :type start: int24 :param start: The offset to start displaying controllers from. Optional.25 :type: limit: int26 :param limit: The maximum number of controllers to return. Optional.27 :type return_type: str28 :param return_type: If this is set to the string 'json', this function29 will return a JSON string. Otherwise, it will return a Python30 dictionary. Optional (will return a Python dictionary by default).31 :rtype: dict, str...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful