Best Python code snippet using fMBT_python
SamplingAccuracyEvaluation.py
Source:SamplingAccuracyEvaluation.py  
1from SamplingAccuracyEvaluation import SamplingAlgorithm as SA2from SamplingAccuracyEvaluation import AccuracyEvaluation as AE3from SamplingAccuracyEvaluation import PrintGraph as PG4from SamplingAccuracyEvaluation import StatisticalCalculation as SC5import operator6def populationListGenerate(filePath, target):7    print('Generate Population List')8    populationList = []9    populationFile = open(filePath, 'r')10    while True:11        line = populationFile.readline()12        if not line: break13        line_data = line.split(',')14        populationList.append(line_data[target])15    populationFile.close()16    return populationList17def calculateScore(evalList):18    score = 019    for i in range(len(evalList)):20        if i == 0:21            score = score + abs(evalList[i])/422        else:23            score = score + abs(evalList[i])/324    return score25def run(windowSize, sampleSize, filePath, target=0):26    print('############## Sampling Accuracy Evaluation ##############')27    count = 128    numOfTrials = 129    jSDPieceCount = 2030    pAAPieceCount = 2031    print('Window Size: ' ,windowSize)32    print('Sample Size: ' ,sampleSize)33    print('JSD Piece Count: ' ,jSDPieceCount)34    print('PAA Piece Count: ' ,pAAPieceCount)35    populationList = populationListGenerate(filePath, target)36    windowList = []37    accuracyMeasureCount = 338    evalDic = {}39    reservoirEvalList = [0.0 for _ in range(accuracyMeasureCount)]40    hashEvalList = [0.0 for _ in range(accuracyMeasureCount)]41    priorityEvalList = [0.0 for _ in range(accuracyMeasureCount)]42    print()43    for data in populationList:44        windowList.append(data)45        if count == windowSize:46            print('################## ' + str(numOfTrials) + ' Evaluation Start ####################')47            # if numOfTrials == 1: PG.printGraph(windowList, 'Population', numOfTrials)48            print()49            print(str(numOfTrials)+'_ReservoirSampling')50            sampleList = SA.sortedReservoirSam(sampleSize, windowList)51            tempEvalList = AE.run(windowList, sampleList, jSDPieceCount, pAAPieceCount)52            SC.sumPerIndex(reservoirEvalList, tempEvalList)53            # if numOfTrials == 1: PG.printGraph(sampleList, 'Reservoir', numOfTrials)54            print()55            print(str(numOfTrials)+'_HashSampling')56            sampleList = SA.hashSam(sampleSize, windowList)57            tempEvalList = AE.run(windowList, sampleList, jSDPieceCount, pAAPieceCount)58            SC.sumPerIndex(hashEvalList, tempEvalList)59            # if numOfTrials == 1: PG.printGraph(sampleList, 'Hash', numOfTrials)60            print()61            print(str(numOfTrials)+'_PrioritySampling')62            sampleList = SA.sortedPrioritySam(sampleSize, windowList)63            tempEvalList = AE.run(windowList, sampleList, jSDPieceCount, pAAPieceCount)64            SC.sumPerIndex(priorityEvalList, tempEvalList)65            # if numOfTrials == 1: PG.printGraph(sampleList, 'Priority', numOfTrials)66            print()67            numOfTrials = numOfTrials + 168            count = 069            windowList = []70        count = count + 171    for i in range(accuracyMeasureCount):72        reservoirEvalList[i] = reservoirEvalList[i] / numOfTrials73        hashEvalList[i] = hashEvalList[i] / numOfTrials74        priorityEvalList[i] = priorityEvalList[i] / numOfTrials75    evalDic['RESERVOIR_SAMPLING'] = calculateScore(reservoirEvalList)76    evalDic['HASH_SAMPLING'] = calculateScore(hashEvalList)77    evalDic['PRIORITY_SAMPLING'] = calculateScore(priorityEvalList)78    sortedEvalList = sorted(evalDic.items(), key = operator.itemgetter(1))...IDT.py
Source:IDT.py  
1from SummaryClass import SummaryClass2import constants, time, Data3def calculateIdtAlgorithm(pointsList):4    start = time.process_time()5    i = 06    idtPointList = []7    for e, element in enumerate(pointsList):8        idtPointList.append(Data.DataIDT(element.Type, element.TimeStamp, element.CoordX, element.CoordY, False, e))9    timeStart = int(idtPointList[0].TimeStamp)10    windowList = []11    coordXList = []12    coordYList = []13    retList = []14    summaryList = []15    countPoints = len(idtPointList)16    countFix = 017    while i < countPoints - 1:18        if idtPointList[i].Type == 'SS':19            i += 120            continue21        currTime = int(idtPointList[i].TimeStamp)22        while currTime - timeStart <= constants.WINDOW_TIME_THRESHOLD:23            windowList.append(idtPointList[i])24            i += 125            if i >= countPoints:26                break27            currTime = int(idtPointList[i].TimeStamp)28        if i >= countPoints:29            break30        if len(windowList) > 1:31            Dispersion = (max(maxX.CoordX for maxX in windowList) - min(minX.CoordX for minX in windowList)) + (max(maxY.CoordY for maxY in windowList) - min(minY.CoordY for minY in windowList))32        while len(windowList) > 1:33            if Dispersion <= constants.DISPERSION_THRESHOLD and len(windowList) > 1:34                while (Dispersion < constants.DISPERSION_THRESHOLD):35                    if idtPointList[i].Type == 'SS':36                        i += 137                        continue38                    windowList.append(idtPointList[i])39                    i += 140                    if i >= countPoints:41                        break42                    Dispersion = (max(maxX.CoordX for maxX in windowList) - min(minX.CoordX for minX in windowList)) + (max(maxY.CoordY for maxY in windowList) - min(minY.CoordY for minY in windowList))43                if i >= countPoints:44                    break45                retList.extend(windowList)46                minWind = min(min.Index for min in windowList)47                maxWind = max(max.Index for max in windowList)48                while minWind < maxWind:49                    idtPointList[minWind].Fixation = True50                    minWind += 151                pointX = sum(sumX.CoordX for sumX in windowList) / len(windowList)52                pointY = sum(sumY.CoordY for sumY in windowList) / len(windowList)53                duration = int(max(maxTime.TimeStamp for maxTime in windowList)) - int(min(minTime.TimeStamp for minTime in windowList))54                summaryList.append(SummaryClass(pointX, pointY, len(windowList), duration))55                countFix += len(windowList)56                coordXList.append(pointX)57                coordYList.append(pointY)58                windowList = []59            else:60                windowList.pop(0)61        if i <= countPoints - 1:62            timeStart = int(pointsList[i].TimeStamp)63    end = time.process_time()...client.py
Source:client.py  
1from socket import timeout2import socket3import sys4import time5from check import ip_checksum6import random7import select8import Queue9s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)10inputs = [s]11outputs= [s]12timeout = 113host='localhost'14port =4021315seq = 016numPackets = 1017windowSize = 318windowList = []19msgList = []20s.setblocking(0)21s.settimeout(1)22	for i in range(numPackets):23		msgList.append('Message #' + str(i))24#fill the queue initially before the algorithm can take over25	for i in range(min(numPackets,windowSize)):26		windowList.append(msgList.pop(0))27	while len(msgList) != 0 or len(windowList) != 0:28	for i in range(min(numPackets,windowSize - len(windowList))):29		windowList.insert(0,msgList.pop(0))30		lastnum =031	while lastnum != (len(windowList) - 1):32		for i in range(len(windowList)):33			checksum = ip_checksum(windowList[i])34			msg = str(i) + ' ' + windowList[i]+ ' ' + checksum35			print msg36			s.sendto(msg,(host,port))37		 try:38			d = s.recvfrom(1024)39			print d, ' ' , d[0][0]40			lastnum+=141		except socket.timeout:42			print ''43	if lastnum == (len(windowList) -1:44		windowList.clear()45				     '''46				     for i in range(10):47				     msg= 'Message #'+str(i)48				     checksum = ip_checksum(msg)49				     error = random.random()50#if error>0.75:51#	checksum=checksum+str(1)52				     s.sendto(str(seq)+' '+msg + '' + checksum ,(host,port))53				     print 'Sent: ', str(seq),' ', msg , '' , checksum54				     try:55				     d = s.recvfrom(1024)56				     print d[0]57				     if d[0].startswith('ACK'):58				     seq= 1 - seq		59				     except timeout:60				     print 'Timeout'61# new code62		     for lastnum in range(0,len(windowList)):63			     if len(windowList) == 064	break65	     time.sleep(2)66	print lastnum, windowList67	       checksum = ip_checksum(windowList[i])68	       msg = str(i) + ' ' + windowList[i]+ ' ' + checksum69	print msg70	       s.sendto(msg,(host,port))71	       try:72		       d = s.recvfrom(1024)73		       print d, ' ' , d[0][0]74		       del windowList[int(d[0][0])]75		       lastnum	76		       except socket.timeout:77		       print ''78		       lastnum+=1...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
