How to use delete_topic method in localstack

Best Python code snippet using localstack_python

kafka_data.py

Source:kafka_data.py Github

copy

Full Screen

...117 'acks': 1,118 }119 })120 return producer121def delete_topic(topics_list: list):122 admin_client = AdminClient({'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS})123 deleted_topics = admin_client.delete_topics(topics_list)124 for topic, future in deleted_topics.items():125 try:126 future.result()127 logging.warning("Topic {} deleted".format(topic))128 except Exception as e:129 logging.warning("Failed to delete topic {}: {}".format(topic, e))130if __name__ == "__main__":131 if len(sys.argv) < 1:132 sys.exit(0)133 debug_collectors = ["route-views2", "route-views.linx", "rrc00", "rrc10"]134 command = sys.argv[1]135 if command == "delete-topic":136 topics_name = list(map(lambda x: x.strip(), sys.argv[2].split(",")))137 delete_topic(topics_name)138 elif command == "delete-atom":139 for c in debug_collectors:140 delete_topic([f"ihr_bgp_atom_{c}", f"ihr_bgp_atom_meta_{c}"])141 elif command == "delete-bc-asn":142 for c in debug_collectors:143 delete_topic([f"ihr_bcscore_{c}", f"ihr_bcscore_meta_{c}"])144 elif command == "delete-bc-prefix":145 for c in debug_collectors:146 delete_topic([f"ihr_bcscore_prefix_{c}", f"ihr_bcscore_prefix_meta_{c}"])147 elif command == "delete-hege-asn":148 delete_topic(["ihr_hegemony", "ihr_hegemony_meta"])149 elif command == "delete-hege-prefix":150 delete_topic(["ihr_prefix_hegemony", "ihr_prefix_hegemony_meta"])151 elif command == "clean":152 for c in debug_collectors:153 delete_topic([f"ihr_bgp_atom_{c}", f"ihr_bgp_atom_meta_{c}"])154 delete_topic([f"ihr_bcscore_{c}", f"ihr_bcscore_meta_{c}"])155 delete_topic([f"ihr_bcscore_prefix_{c}", f"ihr_bcscore_prefix_meta_{c}"])156 delete_topic(["ihr_hegemony", "ihr_hegemony_meta"])...

Full Screen

Full Screen

C.py

Source:C.py Github

copy

Full Screen

1with open('C-small-attempt3.in', 'rb') as f:2 lines = map(lambda x: x.replace('\n',''), f.readlines())3 cases = int(lines[0])4 lines = lines[1:]56with open('outC.txt', 'wb') as out:7 for case in range(cases):8 print 'Case ' + str(case + 1) + ' / ' + str(cases)9 num_topics = int(lines[0])10 topics = []1112 topics_a = []13 topics_b = []14 real_topics_a = []15 real_topics_b = []16 for i in range(num_topics):17 (a, b) = lines[i + 1].split()18 topics.append((a, b))19 topics_a.append(a)20 topics_b.append(b)2122 delete_topic = []23 for topic in topics:24 if topics_a.count(topic[0]) == 1 or \25 topics_b.count(topic[1]) == 1:26 real_topics_a.append(topic[0])27 real_topics_b.append(topic[1])28 delete_topic.append(topic)29 for topic in delete_topic:30 topics.remove(topic)3132 best = None33 best_count = -134 while True:35 for i in topics:36 if real_topics_a.count(i[0]) == 0:37 equals_a = filter(lambda x: x[0] == i[0], topics)38 if len(equals_a) > 1:39 find_a = 040 best_a = None41 for x in equals_a:42 count = 043 for y in topics:44 if y != x:45 if real_topics_a.count(y[0]) == 0 and \46 y[0] == x:47 count += 148 if count > best_count:49 best_count = count50 best = x51 if real_topics_b.count(i[1]) == 0:52 equals_a = filter(lambda x: x[1] == i[1], topics)53 if len(equals_a) > 1:54 find_a = 055 best_a = None56 for x in equals_a:57 count = 058 for y in topics:59 if y != x:60 if real_topics_a.count(y[1]) == 0 and \61 y[1] == x:62 count += 163 if count > best_count:64 best_count = count65 best = x66 if best is not None:67 topics.remove(best)68 real_topics_a.append(best[0])69 real_topics_b.append(best[1])70 best = None71 best_count = -172 else:73 break7475 delete_topic = []76 for topic in topics:77 if real_topics_a.count(topic[0]) == 0 or \78 real_topics_b.count(topic[1]) == 0:79 delete_topic.append(topic)80 for topic in delete_topic:81 topics.remove(topic)8283 result = len(topics)8485 out.write('Case #' + str(case + 1) + ': ' + str(result) + '\n')86 lines = lines[num_topics + 1:]87888990919293 ...

Full Screen

Full Screen

mqtt_message.py

Source:mqtt_message.py Github

copy

Full Screen

1import json2import datetime3from server.http_codes import http_response_code4'''5Create a topic manager to manage overall topics6The types of topics are broadly divided into a topic to receive data transmitted from the7Arduino board and a topic to check the ping status.8Other topics can be additionally managed9'''10class MqttMessages:11 nodes = []12 ping_receive = []13 mqtt_topic = []14 topics = []15 ping_message = {}16 vos = 017 delete_topic = []18 def __init__(self):19 self.ping_message_format = []20 def set_vos(self, number):21 self.vos = number22 def get_nodes(self):23 return self.nodes24 def add_node(self, nodeid):25 if nodeid not in self.nodes:26 self.nodes.append(nodeid)27 return True28 else:29 return False30 def get_message_format(self, format):31 self.clear_topics()32 temp = format['nodes']33 self.sensors = temp34 for i in range(len(temp)):35 temp[i]['id'] = str(temp[i]['id'])36 topic = "data/" + temp[i]['id']37 self.nodes.append(str(temp[i]['id']))38 self.ping_receive.append(("ping/" + temp[i]['id']))39 self.add_mqtt_topic(topic, self.vos)40 def add_mqtt_topic(self, topic, vos):41 self.topics.append(topic)42 topic = (topic, vos)43 self.mqtt_topic.append(topic)44 def get_delete_node(self, nodeid):45 self.delete_topic = []46 for i in range(len(self.topics)):47 v_topic = self.topics[i].split('/')48 if v_topic[1] is nodeid:49 self.nodes.remove(nodeid)50 self.delete_topic.append(self.topics[i])51 print(self.delete_topic)52 return self.delete_topic53 def clear_topics(self):54 self.mqtt_topic = []55 self.topics = []56 self.nodes = []57 self.ping_receive = []...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful