How to use register_publisher method in localstack

Best Python code snippet using localstack_python

atl.py

Source:atl.py Github

copy

Full Screen

...13class ROSNode(object):14 def __init__(self):15 self.pubs = {}16 self.subs = {}17 def register_publisher(self, topic, msg_type, queue_size=1):18 self.pubs[topic] = rospy.Publisher(topic,19 msg_type,20 queue_size=queue_size)21 def register_subscriber(self, topic, msg_type, callback):22 self.subs[topic] = rospy.Subscriber(topic, msg_type, callback)23class Camera(ROSNode):24 def __init__(self):25 super(Camera, self).__init__()26 self.image_topic = "/atl/camera/image"27 self.mode_topic = "/prototype/camera/mode"28 self.register_publisher(self.mode_topic, String)29 def set_mode(self, mode):30 msg = String()31 msg.data = mode32 self.pubs[self.mode_topic].publish(msg)33class LandingZone(ROSNode):34 def __init__(self):35 super(LandingZone, self).__init__()36 self.position_topic = "/atl/lz/position/set"37 self.velocity_topic = "/atl/lz/velocity/set"38 self.angular_velocity_topic = "/atl/lz/angular_velocity/set"39 self.register_publisher(self.position_topic, Vector3)40 self.register_publisher(self.velocity_topic, Float64)41 self.register_publisher(self.angular_velocity_topic, Float64)42 def set_position(self, pos):43 msg = Vector3()44 msg.x = pos[0]45 msg.y = pos[1]46 msg.z = pos[2]47 self.pubs[self.position_topic].publish(msg)48 def set_velocity(self, vel):49 msg = Float64()50 msg.data = vel51 self.pubs[self.velocity_topic].publish(msg)52 def set_angular_velocity(self, ang_vel):53 msg = Float64()54 msg.data = ang_vel55 self.pubs[self.angular_velocity_topic].publish(msg)56class Gimbal(ROSNode):57 def __init__(self):58 super(Gimbal, self).__init__()59 self.activate_topic = "/atl/gimbal/activate"60 self.attitude_topic = "/atl/gimbal/setpoint/attitude"61 self.track_topic = "/atl/gimbal/track"62 self.register_publisher(self.activate_topic, Bool)63 self.register_publisher(self.attitude_topic, Vector3)64 self.register_publisher(self.track_topic, Vector3)65 def activate(self, value):66 msg = Bool()67 msg.data = value68 self.pubs[self.activate_topic].publish(msg)69 def set_attitude(self, attitude):70 msg = Vector3()71 msg.x = attitude[0]72 msg.y = attitude[1]73 msg.z = 0.074 self.pubs[self.attitude_topic].publish(msg)75 def track(self, track):76 msg = Vector3()77 msg.x = track[0]78 msg.y = track[1]79 msg.z = 0.080 self.pubs[self.track_topic].publish(msg)81class Quadrotor(ROSNode):82 def __init__(self):83 super(Quadrotor, self).__init__()84 self.arm_topic = "/atl/control/arm"85 self.mode_topic = "/atl/control/mode"86 self.heading_topic = "/atl/control/yaw/set"87 self.hover_point_topic = "/atl/control/hover/set"88 self.hover_height_topic = "/atl/control/hover/height/set"89 self.pctrl_set_topic = "/atl/control/position_controller/set"90 self.tctrl_set_topic = "/atl/control/tracking_controller/set"91 self.lctrl_set_topic = "/atl/control/landing_controller/set"92 self.register_publisher(self.arm_topic, Bool)93 self.register_publisher(self.mode_topic, String)94 self.register_publisher(self.heading_topic, Float64)95 self.register_publisher(self.hover_point_topic, Vector3)96 self.register_publisher(self.hover_height_topic, Float64)97 self.register_publisher(self.pctrl_set_topic, PCtrlSettings)98 self.register_publisher(self.tctrl_set_topic, TCtrlSettings)99 self.register_publisher(self.lctrl_set_topic, LCtrlSettings)100 def set_arm(self, arm):101 msg = Bool()102 msg.data = arm103 self.pubs[self.arm_topic].publish(msg)104 def set_mode(self, mode):105 msg = String()106 msg.data = mode107 self.pubs[self.mode_topic].publish(msg)108 def set_yaw(self, yaw):109 msg = Float64()110 msg.data = yaw * (pi / 180.0)111 self.pubs[self.heading_topic].publish(msg)112 def set_hover_point(self, hover_point):113 msg = Vector3()114 msg.x = hover_point[0]115 msg.y = hover_point[1]116 msg.z = hover_point[2]117 self.pubs[self.hover_point_topic].publish(msg)118 def set_hover_height(self, hover_height):119 msg = Float64()120 msg.data = hover_height121 self.pubs[self.hover_height_topic].publish(msg)122 def set_pctrl_settings(self, params):123 msg = PCtrlSettings()124 msg.roll_controller.min = params["roll"]["min"]125 msg.roll_controller.max = params["roll"]["max"]126 msg.roll_controller.k_p = params["roll"]["k_p"]127 msg.roll_controller.k_i = params["roll"]["k_i"]128 msg.roll_controller.k_d = params["roll"]["k_d"]129 msg.pitch_controller.min = params["pitch"]["min"]130 msg.pitch_controller.max = params["pitch"]["max"]131 msg.pitch_controller.k_p = params["pitch"]["k_p"]132 msg.pitch_controller.k_i = params["pitch"]["k_i"]133 msg.pitch_controller.k_d = params["pitch"]["k_d"]134 msg.throttle_controller.k_p = params["throttle"]["k_p"]135 msg.throttle_controller.k_i = params["throttle"]["k_i"]136 msg.throttle_controller.k_d = params["throttle"]["k_d"]137 msg.hover_throttle = params["throttle"]["hover"]138 self.pubs[self.pctrl_set_topic].publish(msg)139 def set_tctrl_settings(self, params):140 msg = TCtrlSettings()141 msg.roll_controller.min = params["roll"]["min"]142 msg.roll_controller.max = params["roll"]["max"]143 msg.roll_controller.k_p = params["roll"]["k_p"]144 msg.roll_controller.k_i = params["roll"]["k_i"]145 msg.roll_controller.k_d = params["roll"]["k_d"]146 msg.pitch_controller.min = params["pitch"]["min"]147 msg.pitch_controller.max = params["pitch"]["max"]148 msg.pitch_controller.k_p = params["pitch"]["k_p"]149 msg.pitch_controller.k_i = params["pitch"]["k_i"]150 msg.pitch_controller.k_d = params["pitch"]["k_d"]151 msg.throttle_controller.k_p = params["throttle"]["k_p"]152 msg.throttle_controller.k_i = params["throttle"]["k_i"]153 msg.throttle_controller.k_d = params["throttle"]["k_d"]154 msg.hover_throttle = params["throttle"]["hover"]155 msg.vx_controller.k_p = params["vx"]["k_p"]156 msg.vx_controller.k_i = params["vx"]["k_i"]157 msg.vx_controller.k_d = params["vx"]["k_d"]158 msg.vy_controller.k_p = params["vy"]["k_p"]159 msg.vy_controller.k_i = params["vy"]["k_i"]160 msg.vy_controller.k_d = params["vy"]["k_d"]161 msg.vz_controller.k_p = params["vz"]["k_p"]162 msg.vz_controller.k_i = params["vz"]["k_i"]163 msg.vz_controller.k_d = params["vz"]["k_d"]164 self.pubs[self.tctrl_set_topic].publish(msg)165 def set_lctrl_settings(self, params):166 msg = LCtrlSettings()167 msg.roll_controller.min = params["roll"]["min"]168 msg.roll_controller.max = params["roll"]["max"]169 msg.roll_controller.k_p = params["roll"]["k_p"]170 msg.roll_controller.k_i = params["roll"]["k_i"]171 msg.roll_controller.k_d = params["roll"]["k_d"]172 msg.pitch_controller.min = params["pitch"]["min"]173 msg.pitch_controller.max = params["pitch"]["max"]174 msg.pitch_controller.k_p = params["pitch"]["k_p"]175 msg.pitch_controller.k_i = params["pitch"]["k_i"]176 msg.pitch_controller.k_d = params["pitch"]["k_d"]177 msg.throttle_controller.k_p = params["throttle"]["k_p"]178 msg.throttle_controller.k_i = params["throttle"]["k_i"]179 msg.throttle_controller.k_d = params["throttle"]["k_d"]180 msg.hover_throttle = params["throttle"]["hover"]181 msg.vx_controller.k_p = params["vx"]["k_p"]182 msg.vx_controller.k_i = params["vx"]["k_i"]183 msg.vx_controller.k_d = params["vx"]["k_d"]184 msg.vy_controller.k_p = params["vy"]["k_p"]185 msg.vy_controller.k_i = params["vy"]["k_i"]186 msg.vy_controller.k_d = params["vy"]["k_d"]187 msg.vz_controller.k_p = params["vz"]["k_p"]188 msg.vz_controller.k_i = params["vz"]["k_i"]189 msg.vz_controller.k_d = params["vz"]["k_d"]190 self.pubs[self.lctrl_set_topic].publish(msg)191class World(ROSNode):192 def __init__(self):193 super(World, self).__init__()194 self.model_pose_topic = "/prototype/world/model/pose"195 self.register_publisher(self.model_pose_topic, ModelPose)196 def set_model_pose(self, model_name, pos, rpy):197 msg = ModelPose()198 msg.model_name.data = model_name199 msg.position.x = pos[0]200 msg.position.y = pos[1]201 msg.position.z = pos[2]202 msg.orientation.x = rpy[0]203 msg.orientation.y = rpy[1]204 msg.orientation.z = rpy[2]205 self.pubs[self.model_pose_topic].publish(msg)206class MAVROS(ROSNode):207 def __init__(self):208 super(MAVROS, self).__init__()209 self.local_pose_topic = "/mavros/local_position/pose"210 self.register_publisher(self.local_pose_topic, PoseStamped)211 def set_local_pose(self, x, y, z):212 msg = PoseStamped()213 msg.pose.position.x = x214 msg.pose.position.y = y215 msg.pose.position.z = z216 msg.pose.orientation.x = 0217 msg.pose.orientation.y = 0218 msg.pose.orientation.z = 0219 msg.pose.orientation.w = 1...

Full Screen

Full Screen

test_publishers.py

Source:test_publishers.py Github

copy

Full Screen

...17 self.assertEqual(publishers, {PRE_PUBLISHER_TYPE: {},18 POST_PUBLISHER_TYPE: {}})19 def some_callable():20 pass21 register_publisher(some_callable, 'Test')22 self.assertTrue('test' in publishers[PRE_PUBLISHER_TYPE])23 self.assertTrue(24 isinstance(publishers[PRE_PUBLISHER_TYPE]['test'], list)25 )26 self.assertTrue(publishers[PRE_PUBLISHER_TYPE]['test'][0],27 some_callable)28 def test_registering_a_post_publisher(self):29 """testing if registering a post publisher is working properly30 """31 self.assertEqual(publishers, {PRE_PUBLISHER_TYPE: {},32 POST_PUBLISHER_TYPE: {}})33 def some_callable():34 pass35 register_publisher(some_callable, 'Test', POST_PUBLISHER_TYPE)36 self.assertTrue('test' in publishers[POST_PUBLISHER_TYPE])37 self.assertTrue(38 isinstance(publishers[POST_PUBLISHER_TYPE]['test'], list)39 )40 self.assertTrue(publishers[POST_PUBLISHER_TYPE]['test'][0],41 some_callable)42 def test_registering_with_the_decorator_is_working_properly(self):43 """testing if registering with decorator is working properly44 """45 @publisher('Test')46 def some_callable():47 pass48 self.assertTrue('test' in publishers[PRE_PUBLISHER_TYPE])49 self.assertTrue(50 isinstance(publishers[PRE_PUBLISHER_TYPE]['test'], list)51 )52 self.assertTrue(53 publishers[PRE_PUBLISHER_TYPE]['test'][0],54 some_callable55 )56 def test_run_publishers_is_working_properly(self):57 """testing if the run_publishers() function calls all the publishers58 under given name59 """60 called = []61 @publisher('Test')62 def func1():63 called.append('func1')64 @publisher('Test')65 def func2():66 called.append('func2')67 @publisher('Test2')68 def func3():69 """another publisher for some other type70 """71 called.append('func3')72 @publisher73 def func4():74 """this should run for everything75 """76 called.append('func4')77 # be sure that the called is still empty78 self.assertEqual(called, [])79 # now run all publishers80 run_publishers('Test')81 self.assertEqual(called, ['func4', 'func1', 'func2'])82 def test_run_publishers_is_working_properly_with_pre_publishers_specified(self):83 """testing if the run_publishers() function calls all the pre84 publishers under given name85 """86 called = []87 @publisher('Test', publisher_type=PRE_PUBLISHER_TYPE)88 def func1():89 called.append('func1')90 @publisher('Test', publisher_type=PRE_PUBLISHER_TYPE)91 def func2():92 called.append('func2')93 @publisher('Test2', publisher_type=PRE_PUBLISHER_TYPE)94 def func3():95 """another publisher for some other type96 """97 called.append('func3')98 @publisher(publisher_type=PRE_PUBLISHER_TYPE)99 def func4():100 """this should run for everything101 """102 called.append('func4')103 # be sure that the called is still empty104 self.assertEqual(called, [])105 # now run all publishers106 run_publishers('Test')107 self.assertEqual(called, ['func4', 'func1', 'func2'])108 def test_run_publishers_is_working_properly_with_post_publishers_specified(self):109 """testing if the run_publishers() function calls all the post110 publishers under given name111 """112 called = []113 # Register some pre publishers114 @publisher('Test', publisher_type=PRE_PUBLISHER_TYPE)115 def pre_func1():116 called.append('pre_func1')117 @publisher('Test', publisher_type=PRE_PUBLISHER_TYPE)118 def pre_func2():119 called.append('pre_func2')120 @publisher('Test2', publisher_type=PRE_PUBLISHER_TYPE)121 def pre_func3():122 """another publisher for some other type123 """124 called.append('pre_func3')125 @publisher(publisher_type=PRE_PUBLISHER_TYPE)126 def pre_func4():127 """this should run for everything128 """129 called.append('pre_func4')130 # Register some post publishers131 @publisher('Test', publisher_type=POST_PUBLISHER_TYPE)132 def func1():133 called.append('func1')134 @publisher('Test', publisher_type=POST_PUBLISHER_TYPE)135 def func2():136 called.append('func2')137 @publisher('Test2', publisher_type=POST_PUBLISHER_TYPE)138 def func3():139 """another publisher for some other type140 """141 called.append('func3')142 @publisher(publisher_type=POST_PUBLISHER_TYPE)143 def func4():144 """this should run for everything145 """146 called.append('func4')147 # be sure that the called is still empty148 self.assertEqual(called, [])149 # now run all publishers150 run_publishers('Test', publisher_type=POST_PUBLISHER_TYPE)151 self.assertEqual(called, ['func4', 'func1', 'func2'])152 def test_registering_a_callable_multiple_times(self):153 """testing if registering a callable multiple times will not call the154 function more than 1 time155 """156 called = []157 @publisher('Test')158 def func1():159 called.append('func1')160 @publisher('Test')161 def func2():162 called.append('func2')163 @publisher('Test2')164 def func3():165 """another publisher for some other type166 """167 called.append('func3')168 @publisher169 def func4():170 """this should run for everything171 """172 called.append('func4')173 # be sure that the called is still empty174 self.assertEqual(called, [])175 # register fun1 and func2 more than one times176 register_publisher(func1, 'Test')177 register_publisher(func2, 'Test')178 register_publisher(func1, 'Test')179 register_publisher(func2, 'Test')180 # now run all publishers181 run_publishers('Test')182 self.assertEqual(called, ['func4', 'func1', 'func2'])183 def test_calling_base_callable_multiple_times(self):184 """testing if calling the base publishers will not call the functions185 more than 1 time186 """187 called = []188 @publisher('Test')189 def func1():190 called.append('func1')191 @publisher('Test')192 def func2():193 called.append('func2')194 @publisher('Test2')195 def func3():196 """another publisher for some other type197 """198 called.append('func3')199 @publisher200 def func4():201 """this should run for everything202 """203 called.append('func4')204 # be sure that the called is still empty205 self.assertEqual(called, [])206 # now run all publishers207 run_publishers()208 self.assertEqual(called, ['func4'])209 def test_callable_is_not_callable(self):210 """testing if the callable is not a callable will raise a TypeError211 """212 with self.assertRaises(TypeError) as cm:213 not_a_callable = 'this is not callable'214 register_publisher(not_a_callable, 'Test')215 self.assertEqual('str is not callable', str(cm.exception))216 def test_registering_to_multiple_types_with_lists(self):217 """testing if it is possible to register one publisher for multiple218 types219 """220 called = []221 @publisher(['Test', 'Test2'])222 def func1():223 called.append('func1')224 @publisher(['Test', 'Test3'])225 def func2():226 called.append('func2')227 @publisher(['Test2', 'Test3'])228 def func3():...

Full Screen

Full Screen

publishers_manager.py

Source:publishers_manager.py Github

copy

Full Screen

...5from publishers.misp_publisher import MISPPublisher6from schema.publisher import PublisherInputSchema7publishers = {}8def initialize():9 register_publisher(FTPPublisher())10 register_publisher(EMAILPublisher())11 register_publisher(TWITTERPublisher())12 register_publisher(WORDPRESSPublisher())13 register_publisher(MISPPublisher())14def register_publisher(publisher):15 publishers[publisher.type] = publisher16def get_registered_publishers_info():17 publishers_info = []18 for key in publishers:19 publishers_info.append(publishers[key].get_info())20 return publishers_info21def publish(publisher_input_json):22 publisher_input_schema = PublisherInputSchema()23 publisher_input = publisher_input_schema.load(publisher_input_json)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful