How to use stop_and_save method in ATX

Best Python code snippet using ATX

_MapCfg.py

Source:_MapCfg.py Github

copy

Full Screen

1# This Python file uses the following encoding: utf-82"""autogenerated by genpy from perception_msgs/MapCfg.msg. Do not edit."""3import sys4python3 = True if sys.hexversion > 0x03000000 else False5import genpy6import struct7class MapCfg(genpy.Message):8 _md5sum = "114743d989eebb15f931732b39a7a0f5"9 _type = "perception_msgs/MapCfg"10 _has_header = False #flag to mark the presence of a Header object11 _full_text = """12bool start_or_pause13bool stop_and_save14bool closed15float32 speed_limit_kph16string bag_name17string tpk_name18####################################3319int32 rtk_mode20int32 driving_mode # acc or collision avoidance21int32 special_mode #22####################################3323int32 obs_search_strategy # 0: donot search obs24int32 speed_mode #25int32 obs_strategy #26int32 follow_strategy #27int32 cross_option #28int32 reserved_option #29"""30 __slots__ = ['start_or_pause','stop_and_save','closed','speed_limit_kph','bag_name','tpk_name','rtk_mode','driving_mode','special_mode','obs_search_strategy','speed_mode','obs_strategy','follow_strategy','cross_option','reserved_option']31 _slot_types = ['bool','bool','bool','float32','string','string','int32','int32','int32','int32','int32','int32','int32','int32','int32']32 def __init__(self, *args, **kwds):33 """34 Constructor. Any message fields that are implicitly/explicitly35 set to None will be assigned a default value. The recommend36 use is keyword arguments as this is more robust to future message37 changes. You cannot mix in-order arguments and keyword arguments.38 The available fields are:39 start_or_pause,stop_and_save,closed,speed_limit_kph,bag_name,tpk_name,rtk_mode,driving_mode,special_mode,obs_search_strategy,speed_mode,obs_strategy,follow_strategy,cross_option,reserved_option40 :param args: complete set of field values, in .msg order41 :param kwds: use keyword arguments corresponding to message field names42 to set specific fields.43 """44 if args or kwds:45 super(MapCfg, self).__init__(*args, **kwds)46 #message fields cannot be None, assign default values for those that are47 if self.start_or_pause is None:48 self.start_or_pause = False49 if self.stop_and_save is None:50 self.stop_and_save = False51 if self.closed is None:52 self.closed = False53 if self.speed_limit_kph is None:54 self.speed_limit_kph = 0.55 if self.bag_name is None:56 self.bag_name = ''57 if self.tpk_name is None:58 self.tpk_name = ''59 if self.rtk_mode is None:60 self.rtk_mode = 061 if self.driving_mode is None:62 self.driving_mode = 063 if self.special_mode is None:64 self.special_mode = 065 if self.obs_search_strategy is None:66 self.obs_search_strategy = 067 if self.speed_mode is None:68 self.speed_mode = 069 if self.obs_strategy is None:70 self.obs_strategy = 071 if self.follow_strategy is None:72 self.follow_strategy = 073 if self.cross_option is None:74 self.cross_option = 075 if self.reserved_option is None:76 self.reserved_option = 077 else:78 self.start_or_pause = False79 self.stop_and_save = False80 self.closed = False81 self.speed_limit_kph = 0.82 self.bag_name = ''83 self.tpk_name = ''84 self.rtk_mode = 085 self.driving_mode = 086 self.special_mode = 087 self.obs_search_strategy = 088 self.speed_mode = 089 self.obs_strategy = 090 self.follow_strategy = 091 self.cross_option = 092 self.reserved_option = 093 def _get_types(self):94 """95 internal API method96 """97 return self._slot_types98 def serialize(self, buff):99 """100 serialize message into buffer101 :param buff: buffer, ``StringIO``102 """103 try:104 _x = self105 buff.write(_get_struct_3Bf().pack(_x.start_or_pause, _x.stop_and_save, _x.closed, _x.speed_limit_kph))106 _x = self.bag_name107 length = len(_x)108 if python3 or type(_x) == unicode:109 _x = _x.encode('utf-8')110 length = len(_x)111 buff.write(struct.pack('<I%ss'%length, length, _x))112 _x = self.tpk_name113 length = len(_x)114 if python3 or type(_x) == unicode:115 _x = _x.encode('utf-8')116 length = len(_x)117 buff.write(struct.pack('<I%ss'%length, length, _x))118 _x = self119 buff.write(_get_struct_9i().pack(_x.rtk_mode, _x.driving_mode, _x.special_mode, _x.obs_search_strategy, _x.speed_mode, _x.obs_strategy, _x.follow_strategy, _x.cross_option, _x.reserved_option))120 except struct.error as se: self._check_types(struct.error("%s: '%s' when writing '%s'" % (type(se), str(se), str(locals().get('_x', self)))))121 except TypeError as te: self._check_types(ValueError("%s: '%s' when writing '%s'" % (type(te), str(te), str(locals().get('_x', self)))))122 def deserialize(self, str):123 """124 unpack serialized message in str into this message instance125 :param str: byte array of serialized message, ``str``126 """127 try:128 end = 0129 _x = self130 start = end131 end += 7132 (_x.start_or_pause, _x.stop_and_save, _x.closed, _x.speed_limit_kph,) = _get_struct_3Bf().unpack(str[start:end])133 self.start_or_pause = bool(self.start_or_pause)134 self.stop_and_save = bool(self.stop_and_save)135 self.closed = bool(self.closed)136 start = end137 end += 4138 (length,) = _struct_I.unpack(str[start:end])139 start = end140 end += length141 if python3:142 self.bag_name = str[start:end].decode('utf-8')143 else:144 self.bag_name = str[start:end]145 start = end146 end += 4147 (length,) = _struct_I.unpack(str[start:end])148 start = end149 end += length150 if python3:151 self.tpk_name = str[start:end].decode('utf-8')152 else:153 self.tpk_name = str[start:end]154 _x = self155 start = end156 end += 36157 (_x.rtk_mode, _x.driving_mode, _x.special_mode, _x.obs_search_strategy, _x.speed_mode, _x.obs_strategy, _x.follow_strategy, _x.cross_option, _x.reserved_option,) = _get_struct_9i().unpack(str[start:end])158 return self159 except struct.error as e:160 raise genpy.DeserializationError(e) #most likely buffer underfill161 def serialize_numpy(self, buff, numpy):162 """163 serialize message with numpy array types into buffer164 :param buff: buffer, ``StringIO``165 :param numpy: numpy python module166 """167 try:168 _x = self169 buff.write(_get_struct_3Bf().pack(_x.start_or_pause, _x.stop_and_save, _x.closed, _x.speed_limit_kph))170 _x = self.bag_name171 length = len(_x)172 if python3 or type(_x) == unicode:173 _x = _x.encode('utf-8')174 length = len(_x)175 buff.write(struct.pack('<I%ss'%length, length, _x))176 _x = self.tpk_name177 length = len(_x)178 if python3 or type(_x) == unicode:179 _x = _x.encode('utf-8')180 length = len(_x)181 buff.write(struct.pack('<I%ss'%length, length, _x))182 _x = self183 buff.write(_get_struct_9i().pack(_x.rtk_mode, _x.driving_mode, _x.special_mode, _x.obs_search_strategy, _x.speed_mode, _x.obs_strategy, _x.follow_strategy, _x.cross_option, _x.reserved_option))184 except struct.error as se: self._check_types(struct.error("%s: '%s' when writing '%s'" % (type(se), str(se), str(locals().get('_x', self)))))185 except TypeError as te: self._check_types(ValueError("%s: '%s' when writing '%s'" % (type(te), str(te), str(locals().get('_x', self)))))186 def deserialize_numpy(self, str, numpy):187 """188 unpack serialized message in str into this message instance using numpy for array types189 :param str: byte array of serialized message, ``str``190 :param numpy: numpy python module191 """192 try:193 end = 0194 _x = self195 start = end196 end += 7197 (_x.start_or_pause, _x.stop_and_save, _x.closed, _x.speed_limit_kph,) = _get_struct_3Bf().unpack(str[start:end])198 self.start_or_pause = bool(self.start_or_pause)199 self.stop_and_save = bool(self.stop_and_save)200 self.closed = bool(self.closed)201 start = end202 end += 4203 (length,) = _struct_I.unpack(str[start:end])204 start = end205 end += length206 if python3:207 self.bag_name = str[start:end].decode('utf-8')208 else:209 self.bag_name = str[start:end]210 start = end211 end += 4212 (length,) = _struct_I.unpack(str[start:end])213 start = end214 end += length215 if python3:216 self.tpk_name = str[start:end].decode('utf-8')217 else:218 self.tpk_name = str[start:end]219 _x = self220 start = end221 end += 36222 (_x.rtk_mode, _x.driving_mode, _x.special_mode, _x.obs_search_strategy, _x.speed_mode, _x.obs_strategy, _x.follow_strategy, _x.cross_option, _x.reserved_option,) = _get_struct_9i().unpack(str[start:end])223 return self224 except struct.error as e:225 raise genpy.DeserializationError(e) #most likely buffer underfill226_struct_I = genpy.struct_I227def _get_struct_I():228 global _struct_I229 return _struct_I230_struct_9i = None231def _get_struct_9i():232 global _struct_9i233 if _struct_9i is None:234 _struct_9i = struct.Struct("<9i")235 return _struct_9i236_struct_3Bf = None237def _get_struct_3Bf():238 global _struct_3Bf239 if _struct_3Bf is None:240 _struct_3Bf = struct.Struct("<3Bf")...

Full Screen

Full Screen

label_directory_dataperf.py

Source:label_directory_dataperf.py Github

copy

Full Screen

1from ast import parse2import glob3import argparse4import sys5import tty6import termios7from pathlib import Path8import csv9import os10import pydub11import pydub.playback12import pydub.effects13class _GetchUnix:14 """https://stackoverflow.com/a/510364"""15 def __call__(self):16 fd = sys.stdin.fileno()17 old_settings = termios.tcgetattr(fd)18 try:19 tty.setraw(sys.stdin.fileno())20 ch = sys.stdin.read(1)21 finally:22 termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)23 return ch24def label(args):25 already_listened = []26 if os.path.isfile(args.out_csv):27 previous_file = Path(args.out_csv)28 print("datadir:", previous_file)29 with open(previous_file, "r") as fh:30 reader = csv.reader(fh) #file, good?31 for row in reader:32 already_listened.append(row[0])33 already_listened = set(already_listened)34 datadir = Path(args.datadir)35 print("datadir:", datadir)36 splitdir = Path(args.splitdir)37 print("splitdir:", splitdir)38 wav_list = [] #list of wav file names in the dev and test splits39 dev_splits = splitdir / "en_dev.csv"40 with open(dev_splits, "r") as fh:41 reader = csv.reader(fh) # SET,LINK,WORD,VALID,SPEAKER,GENDER42 for row in reader:43 if row[1] == args.word:44 opus = row[0] # aachen/common_voice_en_18833718.opus45 wav = opus.replace("opus", "wav")46 if wav in already_listened:47 continue48 wav_list.append(wav)49 test_splits = splitdir / "en_test.csv"50 with open(test_splits, "r") as fh:51 reader = csv.reader(fh)52 for row in reader:53 if row[1] == args.word:54 opus = row[0]55 wav = opus.replace("opus", "wav")56 if wav in already_listened:57 continue58 wav_list.append(wav)59 60 print("len(wav_list):", len(wav_list))61 62 getch = _GetchUnix()63 results = []64 for ix, clip in enumerate(wav_list):65 print(f"\n:::::: CLIP # {ix} :::", clip)66 fpath = str(datadir / clip)67 stop_and_save = False68 while True:69 print("-")70 wav = pydub.AudioSegment.from_wav(fpath)71 wav = pydub.effects.normalize(wav)72 pydub.playback.play(wav)73 choice = getch()74 if choice == "g":75 usable = True76 break77 elif choice == "b":78 usable = False79 break80 elif choice == "s":81 stop_and_save = True82 break 83 elif choice == "q":84 sys.exit()85 result = "good" if usable else "bad"86 row = [clip, result]87 print(row)88 results.append(row)89 if stop_and_save:90 break #allows stopping and saving part of the way through a word91 if not args.dryrun:92 out_csv = args.out_csv93 with open(out_csv, "a") as fh: #append to exisiting file94 writer = csv.writer(fh)95 writer.writerows(results)96 # summary97 print("\n\n\n\n:::::::::: SUMMARY ")98 print("num good:", len([g for g in results if g[1] == "good"]))99 print("num bad:", len([g for g in results if g[1] == "bad"]))100 if not args.dryrun:101 print(f">>>>>> results written to {out_csv}")102if __name__ == "__main__":103 parser = argparse.ArgumentParser(description="labeler for listening data")104 parser.add_argument("datadir", help="directory of wavs")105 parser.add_argument("splitdir", help="directory of split csvs")106 parser.add_argument("word", help="word to analyze")107 parser.add_argument("out_csv", help="output filepath. location of checkpoint if previously saved")108 parser.add_argument('--dryrun', action="store_true", help="do not write to output csv file")109 group = parser.add_mutually_exclusive_group()...

Full Screen

Full Screen

main_time_tracker.py

Source:main_time_tracker.py Github

copy

Full Screen

...14 #data = generate_some_data_example()15 data = loadtxt('data.csv', delimiter=',')16 #data = loadtxt('Data_Luca.csv', delimiter=',')17 Y = data18 TimeTracker.stop_and_save('load_data', tic)19 #modify and pick a subset of Y for calculating robust parameters on Y_training20 tic = TimeTracker.start()21 num_clusters= 522 num_classes_training = 323 Y_training, num_classes_training = get_training_set_example(Y, num_clusters, num_classes_training)24 # num_classes_training = 225 # Y_training = numpy.vstack([Y[0:299, :], Y[600:899, :]])26 TimeTracker.stop_and_save('get_training_set', tic)27 #automatic robust parameters from y_training and num of training classes28 tic = TimeTracker.start()29 list_robust_mean, list_inv_cov_mat = calculate_robust_parameters(Y_training, num_classes_training)30 TimeTracker.stop_and_save('calculate_robust_parameters', tic)31 # STEP 232 #modify specify_user_input to change hyperparameters_setter to match your data33 tic = TimeTracker.start()34 user_input_parameters = specify_user_input(list_robust_mean, list_inv_cov_mat, Y)35 TimeTracker.stop_and_save('specify_user_input', tic)36 #!the rest of the code is automatic37 #automatic set of hyperparameters from previously specified user input38 tic = TimeTracker.start()39 hyperparameters_model: HyperparametersModel = set_hyperparameters(user_input_parameters, Y)40 TimeTracker.stop_and_save('set_hyperparameters', tic)41 #CAVI (init + update + elbo)42 tic = TimeTracker.start()43 variational_parameters, elbo_values = cavi_time_tracker(Y, list_robust_mean, hyperparameters_model, user_input_parameters)44 TimeTracker.stop_and_save('cavi', tic)45 #Generate figure of induced partition46 tic = TimeTracker.start()47 generate_induced_partition(Y, list_robust_mean, hyperparameters_model, variational_parameters, cov_ellipse=False)48 generate_induced_partition(Y, list_robust_mean, hyperparameters_model, variational_parameters, cov_ellipse=True)49 TimeTracker.stop_and_save('generate_induced_partition', tic)50 #Plot elbo51 tic = TimeTracker.start()52 generate_elbo_plot(elbo_values)53 TimeTracker.stop_and_save('generate_elbo_plot', tic)54 #Performance55 TimeTracker.get_performance()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run ATX automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful