Best Python code snippet using slash
spectral_recorder.py
Source:spectral_recorder.py  
1#!/usr/bin/env python2#3#4# This program is free software: you can redistribute it and/or modify5# it under the terms of the GNU General Public License as published by6# the Free Software Foundation, either version 3 of the License, or7# (at your option) any later version.8# 9# This program is distributed in the hope that it will be useful,10# but WITHOUT ANY WARRANTY; without even the implied warranty of11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the12# GNU General Public License for more details.13# 14# You should have received a copy of the GNU General Public License15# along with this program. If not, see <http://www.gnu.org/licenses/>.16import array17import struct18import sys19import time20import os21import json22from subprocess import call23from operator import itemgetter24from sklearn import preprocessing25import numpy as np26import matplotlib27matplotlib.use('TkAgg')28import matplotlib.pyplot as plt29import numpy as np30class SpectralRecorder:31	phy = "phy0"32	dev = "wlan0"33	drv= "ath9k"34	mode="manual"		#works35	fft_period=1536	spectral_count=10037	spectral_period=138	short_repeat=139	f0=2437e640	def __init__(self, phy="self.phy0", dev="wlan0", drv="ath9k",41			mode="manual", fft_period=15, spectral_count=100, 	spectral_period=1,short_repeat=1,42		load=False,offline=True,freq=2437e6):43		self.phy 		= self.phy44		self.dev 		= self.dev45		self.drv		= drv46		self.mode		= mode47		self.fft_period		= fft_period48		self.spectral_count	= spectral_count49		self.spectral_period	= spectral_period50		self.short_repeat	= short_repeat51		self.f0			= freq #optional useful only if load is True52		#set spectral parameters53		if offline==False:54			self.set_spectral_params()55		if load:56			self.load_monitor()57	def get_spectral_params(self):58		return self.fft_period,self.spectral_count,self.spectral_period,self.short_repeat59	def load_monitor(self):60		#call("bash build.sh --load-module",shell=True)61		call("ifconfig "+self.dev+" down",shell=True)62		call("iwconfig "+self.dev+" mode monitor",shell=True)63		call("ifconfig "+self.dev+" up",shell=True)64		call("iwconfig "+self.dev+" freq "+str(self.f0),shell=True)65	def set_spectral_params(self):66		call("echo "+ self.mode		  		+ " > /sys/kernel/debug/ieee80211/" +self.phy+ "/"+ self.drv +"/spectral_scan_ctl", shell=True)67		call("echo "+ str(self.short_repeat)	  	+ " > /sys/kernel/debug/ieee80211/" +self.phy+ "/"+ self.drv +"/spectral_short_repeat",shell=True)68		call("echo "+ str(self.fft_period)	  	+ " > /sys/kernel/debug/ieee80211/" +self.phy+ "/"+ self.drv +"/spectral_fft_period",shell=True)69		call("echo "+ str(self.spectral_count-1) 	+ " > /sys/kernel/debug/ieee80211/" +self.phy+ "/"+ self.drv +"/spectral_count",shell=True)70		call("echo "+ str(self.spectral_period)		+ " > /sys/kernel/debug/ieee80211/" +self.phy+ "/"+ self.drv +"/spectral_period",shell=True)71	def acquire(self,filename="data",T_acquire=1,T=0.1):72		73		call("cat /sys/kernel/debug/ieee80211/" + self.phy + "/"+ self.drv +"/spectral_scan0 > {}".format(filename), shell=True)74		time.sleep(T)75		t0=time.time()76		t0_a=t077		now=t078		now_a=t079		while now_a-t0_a < T_acquire:80			now_a=time.time()81			while now-t0 < T:82				now=time.time()83				call("echo trigger > /sys/kernel/debug/ieee80211/" + self.phy + "/"+ self.drv +"/spectral_scan_ctl", shell=True)84			now=time.time()85			t0=now86			call("cat /sys/kernel/debug/ieee80211/" + self.phy + "/"+ self.drv +"/spectral_scan0 >> {}".format(filename), shell=True)87			#print(now_a-t0_a)88	def fix_timestamp_dict(self,samp_dict,T=50e3):89		ret=samp_dict90		timestamp=[v['tsf'] for v in samp_dict]91		timestamp=np.array(timestamp)-timestamp[0]92		decr=093		if len(timestamp)!=0:94			tsf_diff=[]95			for i_t in range(0,len(timestamp)-1):96				t_=timestamp[i_t]97				t=timestamp[i_t+1]98				dt=t-t_99				print("t[{}]={}".format(i_t,t))100				print("t[{}]_={}".format(i_t,t_))101				if abs(dt) > T:102					#EMERGE ERROR, TSF chagnes103					print("[E] t[{}]={}".format(i_t,t))104					print("[E] t_[{}]={}".format(i_t,t_))105					ret.pop(i_t+1)106					decr=decr+1107				else:108					tsf_diff.append(dt)109					tsf_new=sum(tsf_diff)110					ret[i_t-decr]['tsf']=tsf_new111		else:112			print("timestamp is empty")113		ret.pop(len(ret)-1)114		return ret115	def fix_timestamp(self,timestamp,busy,x,T=50e3):116		if len(timestamp)!=0:117			timestamp=np.array(timestamp)-timestamp[0]118			tsf_diff=[]119			for i_t in range(0,len(timestamp)-1):120				t_=timestamp[i_t]121				t=timestamp[i_t+1]122				dt=t-t_123				#print("t={}".format(t))124				#print("t_={}".format(t_))125				if abs(dt) > T:126					#EMERGE ERROR, TSF chagnes127					print("t={}".format(t))128					print("t_={}".format(t_))129					busy.pop(i_t)130					x.pop(i_t)131				else:132					tsf_diff.append(dt)133			timestamp=np.cumsum(tsf_diff)134			busy=busy[0:len(timestamp)]135		else:136			print ("timestamp is empty")137		return timestamp,busy,x138	def extract_samples(self,filename="data",out_file="out_samp.json",T=-1):139		y = []140		p_fft=[]141		freq_fft=[]142		busy = []143		timestamp = []144		out_samp=[]145		take_all_samples=False;146		if T == -1:147			take_all_samples=True;148		with open(filename, "rb") as file:149			data = file.read(76)150			i_pos_tmp=0151#			while data != "":152			while data:153				i_pos_tmp=i_pos_tmp+1154				y_t = []155				x = []156				t, length = struct.unpack(">BH", data[0:3])157				if t != 1 or length != 73:158				    print("only 20MHz supported atm")159				    sys.exit(1)160				### metadata161				max_exp, freq, rssi, noise, max_magnitude, max_index, bitmap_weight, tsf = struct.unpack('>BHbbHBBQ', data[3:20])162				### measurements163				measurements = array.array("B")164				measurements.fromstring(data[20:])165				squaresum = sum([(m << max_exp)**2 for m in measurements])166				if squaresum == 0:167				    data = file.read(76)168				    continue169				fft_sub=[]170				for i, m in enumerate(measurements):171					if m == 0 and max_exp == 0:172						m = 1173					v = 10.0**((noise + rssi + 20.0 * np.log10(m << max_exp) - 10.0 * np.log10(squaresum))/10.0)174					fft_sub.append(v)175				entry={}176				entry['tsf']=tsf177				timestamp.append(int(tsf))178				entry['freq']=freq179				entry['rssi']=rssi180				entry['noise']=noise181				entry['fft_sub']=fft_sub182				out_samp.append(entry)183				data = file.read(76)184				if not(take_all_samples):185					if int(timestamp[len(timestamp)-1])-int(timestamp[0]) < T:186						data = file.read(76)187						continue;188					else:189					    	break190		#with open(out_file, 'w') as file:191		#	out_json=json.dumps(out_samp)192		#	file.write(out_json)193		return out_samp194	def get_feature(self,busy,timestamp):195		W = 1e3; #usec196		ts = timestamp-timestamp[0]197		t_= ts[0];198		t = ts[1];199		b_curr=[];200		b_mean=[];201		b_var =[];202		for b in range(0,len(busy)):203			if t-t_ < W:204				t = ts[b]205			else:206				t_= ts[b]207				b_mean.append(np.mean(b_curr))208				b_var.append(np.var(b_curr))209				b_curr=[]210			b_curr.append(busy[b])211		return b_mean, b_var212	def rrc_f(self, T=1, beta=0.8):213		out = []214		for f in np.linspace(-1 / (2.0 * T), 1 / (2.0 * T), num=5):215			if abs(f) <= (1 - beta) / float(2.0 * T):216				out.append(1.0)217			else:218				if (1 - beta) / float(2.0 * T) < abs(f) and abs(f) <= (1 + beta) / float(2.0 * T):219					v = 0.5 * (1 + np.cos(np.pi * T / float(beta) * (abs(f) - (1 - beta) / float(2.0 * T))))220					out.append(v)221				else:222					out.append(0)223		return out224	def get_freq_list(self,freq, N=1):225		ff = []226		for i in range(0, 56):227			# if m == 0 and max_exp == 0:228			#    m = 1229			if i < 28:230				fr = freq - (20.0 / 64) * (28 - i)231			else:232				fr = freq + (20.0 / 64) * (i - 27)233			ff.append(fr)234		fff = []235		#Overasmpling: unused if N=1236		for f in ff:237			for o in range(0, N):238				fff.append(f + o * (20.0 / 64 / N))239		return fff240	def get_spectrum_scan_features(self,filename="demo.tlv",T=100e3):241		skip = False242		thr_bw = 0.05243		thr_corr = 1e-10244		thr_corr_mean = 1245		P_thr_db = -75246		P_thr = 10 ** (P_thr_db / 10.0)247		dt_thr = 2600248		# OUTPUT FEATURES249		spectrum_features = []250		duration_features = []251		duration_energy_det_features = []252		measurements = self.extract_samples(filename, out_file="not-in-use.json",T=T)253		power_features=[]254		mark_as_failure = False255		csi_data = list(map(itemgetter('fft_sub'), measurements))256		csi_data = np.array(csi_data)257		freq = list(map(itemgetter('freq'), measurements))258		tsf = list(map(itemgetter('tsf'), measurements))259		freq = list(set(freq))260		freq = freq[0]261		ff = self.get_freq_list(freq)262		y = np.array(csi_data[0])263		y_ = y264		y_nofilt_ = y265		tt = tsf[0]266		tt_ = tt;267		PLOT = False268		if PLOT:269			fig = plt.figure();270			plt.ion()271			plt.show()272		start_corr = True273		corr_duration = 0;274		energy_det_duration = 0;275		t_corr_start = 0;276		t_energy_det_start = 0;277		y_cont=[]278		P_av_w=[]279		P_av_=0280		print("====================================");281		p_av_list = np.convolve(np.mean(csi_data, axis=1), np.array([1, 1, 1, 1])[::-1], 'same')282		for ii, cc in enumerate(csi_data):283			if ii==0:284				start_energy_det = True285				finish_energy_det = False286			skip = False287			yy = []288			start_f = []289			stop_f = []290			START_BW = True291			y_pow = []292			bw_meas = []293			freq_meas = []294			y = np.array(cc)295			tt = tsf[ii]296			dt = tt - tt_297			y_nofilt = y298			weights = self.rrc_f()299			#weights = [1, 1, 1]300			y = np.convolve(y, np.array(weights)[::-1], 'same')301			min_max_scaler = preprocessing.MinMaxScaler()302			y_det = min_max_scaler.fit_transform(y.reshape(-1, 1))303			y_det_ = min_max_scaler.fit_transform(y_.reshape(-1, 1))304			y_det_nofilt = min_max_scaler.fit_transform(y_nofilt.reshape(-1, 1))305			y_det_nofilt_ = min_max_scaler.fit_transform(y_nofilt_.reshape(-1, 1))306			y_det = y_det[:, 0]307			y_det_ = y_det_[:, 0]308			y_det_nofilt = y_det_nofilt[:, 0]309			y_det_nofilt_ = y_det_nofilt_[:, 0]310			P_av = np.mean(y)311			P_av_w = p_av_list[ii]312			power_features.append(313			{"tsf_p": tt, "p_av": P_av,"p_av_w": P_av_w})314			P_av=P_av_w315			# if len(P_av_w) >N_av: #window size = 5316			# 	P_av_w.pop(0) #window step=1317			# ENERGY DETECTION318			# if P_av_ < P_thr and P_av > P_thr and dt < dt_thr:319			# 	start_energy_det=True;320			# 	t_energy_det_start = tt321			# else:322			# 	if P_av_ > P_thr and P_av < P_thr and dt < dt_thr:323			# 		start_energy_det = False;324			# 		energy_det_duration = tt - t_energy_det_start325			# 		duration_energy_det_features.append(326			# 		 	{"tsf": t_energy_det_start, "duration": energy_det_duration})327			# 	else:328			if P_av >= P_thr and start_energy_det:329						t_energy_det_start = tt;330						print("start:{}",t_energy_det_start);331						start_energy_det = False332						mark_as_failure = False;333			if P_av <= P_thr and not start_energy_det:334						t_energy_det_stop = tt_;335						print("stop:{}", t_energy_det_stop);336						finish_energy_det = True337			if P_av >= P_thr and not start_energy_det:338				if ii < len(tsf)-1:339					if tsf[ii+1]-tsf[ii] > dt_thr:340						mark_as_failure=True341						print("[ii={}] failure dt={}".format(ii,tsf[ii+1]-tsf[ii]))342			if P_av <= P_thr and finish_energy_det:343				if not mark_as_failure:344					print(" [ OK ] finish:{}", t_energy_det_stop-t_energy_det_start);345				else:346					print(" [FAIL] finish:{}", t_energy_det_stop - t_energy_det_start);347				finish_energy_det=False348				start_energy_det=True349				if not mark_as_failure:350						# duration_energy_det_features.append(351						# {"tsf": t_energy_det_start, "duration": t_energy_det_stop-t_energy_det_start})352					duration_energy_det_features.append(353						{"tsf": t_energy_det_start, "tsf_stop": t_energy_det_stop, "duration": t_energy_det_stop - t_energy_det_start})354			print(dt)355			# else:356			# 	if start_energy_det == False:357			# 		energy_det_duration = tt - t_energy_det_start358			# 		start_energy_det = True359			# 		if energy_det_duration != 0:360			# 			if not mark_as_failure:361			# 				duration_energy_det_features.append(362			# 					{"tsf": t_energy_det_start, "duration": energy_det_duration})363			# 			else:364			# 				print("FAILED")365			# 				print({"tsf": t_energy_det_start, "duration": energy_det_duration})366			# 				mark_as_failure=False;367			# 		energy_det_duration = 0368			P_av_ = P_av369			#print(duration_energy_det_features)370			yy = y_det371			N_av = 10372			if len(y_cont) >= N_av:373				y_cont.pop(0)  # window step=1374			if P_av >= P_thr:375				# collect some consecutive samples and average it!376				y_cont.append(yy)377				yy = np.mean(y_cont, axis=0)378				if len(y_cont) == N_av:379					for i in range(0, len(yy)):380						#print(yy[i])381						if yy[i] > thr_bw and i < len(yy) - 1:382							y_pow.append(yy[i])383							if START_BW:384								# print "START"385								start_f.append(ff[i])386								START_BW = False387						else:388							if not START_BW:389								stop_f.append(ff[i - 1])390								START_BW = True391				for i in range(0, min(len(start_f), len(stop_f))):392					bw_meas.append(stop_f[i] - start_f[i] - 20/64.0)393					freq_meas.append((stop_f[i] + start_f[i]) / 2.0)394				# CORRELATION395				y_corr = np.correlate(y_det, y_det_, "same")396				# y_corr = np.correlate(y_det_nofilt, y_det_nofilt_, "same")397				if dt >= dt_thr:398					start_corr = True399					corr_duration = 0400				else:401					# if np.median(y_corr) <= thr_corr_mean and P_av >= P_thr:402					if np.median(y_corr) <= thr_corr_mean:403						if start_corr:404							t_corr_start = tt;405							start_corr = False406						else:407							corr_duration = tt - t_corr_start408					else:409						if start_corr == False:410							start_corr = True411							if corr_duration != 0:412								duration_features.append({"tsf": t_corr_start, "duration": corr_duration})413							corr_duration = 0414				spectrum_features.append({"tsf": tsf[ii], "bw": bw_meas, "freq": freq_meas})415			else:416				y_cont = []417			# update state variables, ready for next round418			tt_ = tt419			y_ = y420			y_nofilt_ = y_nofilt421			#if P_av <= P_thr:422			#	skip = True423			#else:424		return measurements, spectrum_features, duration_energy_det_features, duration_features,freq,power_features425	def plot_waterfall(self,ax,measurements,exp_name):426		csi_data = list(map(itemgetter('fft_sub'), measurements))427		timestamp = list(map(itemgetter('tsf'), measurements))428		freq = list(map(itemgetter('freq'), measurements))429		freq=list(set(freq))430		freq=freq[0]431		timestamp=np.array(timestamp)-timestamp[0]432		T=np.max(timestamp)433		N=len(timestamp)434		y=self.get_freq_list(freq)435		x=timestamp[0:N]436		Z=10.0 * np.log10(csi_data[0:N])437		X,Y = np.meshgrid(y,x)438		cc=ax.pcolormesh(X,Y,Z,vmin=-140, vmax=-20)439		fmin=min(y)440		fmax=max(y)441		ax.set_xlim([fmin,fmax])442		ax.set_ylim([0,T])443		ax.set_ylabel('Time [us]')444		ax.set_xlabel('freq [MHz]')445		ax.set_title(exp_name)446		return ax447	def get_freq_list(self,freq, N=1):448		ff = []449		for i in range(0, 56):450			# if m == 0 and max_exp == 0:451			#    m = 1452			if i < 28:453				fr = freq - (20.0 / 64) * (28 - i)454			else:455				fr = freq + (20.0 / 64) * (i - 27)456			ff.append(fr)457		fff = []458		for f in ff:459			for o in range(0, N):460				fff.append(f + o * (20.0 / 64 / N))461		return fff...test_database.py
Source:test_database.py  
...30        tid3 = uuid()31        try:32            raise KeyError('foo')33        except KeyError as exception:34            self.b.mark_as_failure(tid3, exception)35        assert self.b.get_status(tid3) == states.FAILURE36        assert isinstance(self.b.get_result(tid3), KeyError)37    def xxx_backend(self):38        tid = uuid()39        assert self.b.get_status(tid) == states.PENDING40        assert self.b.get_result(tid) is None41        self.b.mark_as_done(tid, 42)42        assert self.b.get_status(tid) == states.SUCCESS43        assert self.b.get_result(tid) == 4244        tid2 = uuid()45        try:46            raise KeyError('foo')47        except KeyError as exception:48            self.b.mark_as_failure(tid2, exception)49        assert self.b.get_status(tid2) == states.FAILURE50        assert isinstance(self.b.get_result(tid2), KeyError)51    def test_forget(self):52        tid = uuid()53        self.b.mark_as_done(tid, {'foo': 'bar'})54        x = self.app.AsyncResult(tid)55        assert x.result.get('foo') == 'bar'56        x.forget()57        if celery.VERSION[0:3] == (3, 1, 10):58            # bug in 3.1.10 means result did not clear cache after forget.59            x._cache = None...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
