Best Python code snippet using hypothesis
analyzer.py
Source:analyzer.py  
1#!/usr/bin/env python32import argparse3import datetime4import json5import matplotlib.pyplot as plt6import sys7class UidSnapshot(object):8    def __init__(self, activity):9        self.uid = activity['uid']10        self.foregroundWrittenBytes = activity['foregroundWrittenBytes']11        self.foregroundFsyncCalls = activity['foregroundFsyncCalls']12        self.backgroundFsyncCalls = activity['backgroundFsyncCalls']13        self.backgroundWrittenBytes = activity['backgroundWrittenBytes']14        self.appPackages = activity['appPackages']15        self.runtimeMs = activity['runtimeMs']16        self.totalWrittenBytes = self.foregroundWrittenBytes + self.backgroundWrittenBytes17        self.totalFsyncCalls = self.backgroundFsyncCalls + self.foregroundFsyncCalls18        if self.appPackages is None: self.appPackages = []19class Snapshot(object):20    def __init__(self, activity, uptime):21        self.uptime = uptime22        self.uids = {}23        self.foregroundWrittenBytes = 024        self.foregroundFsyncCalls = 025        self.backgroundFsyncCalls = 026        self.backgroundWrittenBytes = 027        self.totalWrittenBytes = 028        self.totalFsyncCalls = 029        for entry in activity:30            uid = entry['uid']31            snapshot = UidSnapshot(entry)32            self.foregroundWrittenBytes += snapshot.foregroundWrittenBytes33            self.foregroundFsyncCalls += snapshot.foregroundFsyncCalls34            self.backgroundFsyncCalls += snapshot.backgroundFsyncCalls35            self.backgroundWrittenBytes += snapshot.backgroundWrittenBytes36            self.totalWrittenBytes += snapshot.totalWrittenBytes37            self.totalFsyncCalls += snapshot.totalFsyncCalls38            self.uids[uid] = snapshot39class Document(object):40    def __init__(self, f):41        self.snapshots = []42        uptimes = [0, 0]43        for line in f:44            line = json.loads(line)45            if line['type'] != 'snapshot': continue46            activity = line['activity']47            uptime = line['uptime']48            if uptime < uptimes[0]: uptimes[0] = uptime49            if uptime > uptimes[1]: uptimes[1] = uptime50            self.snapshots.append(Snapshot(activity, uptime))51        self.runtime = datetime.timedelta(milliseconds=uptimes[1]-uptimes[0])52def merge(l1, l2):53    s1 = set(l1)54    s2 = set(l2)55    return list(s1 | s2)56thresholds = [57    (1024 * 1024 * 1024 * 1024, "TB"),58    (1024 * 1024 * 1024, "GB"),59    (1024 * 1024, "MB"),60    (1024, "KB"),61    (1, "bytes")62]63def prettyPrintBytes(n):64    for t in thresholds:65        if n >= t[0]:66            return "%.1f %s" % (n / (t[0] + 0.0), t[1])67    return "0 bytes"68# knowledge extracted from android_filesystem_config.h69wellKnownUids = {70    0 :         ["linux kernel"],71    1010 :      ["wifi"],72    1013 :      ["mediaserver"],73    1017 :      ["keystore"],74    1019 :      ["DRM server"],75    1021 :      ["GPS"],76    1023 :      ["media storage write access"],77    1036 :      ["logd"],78    1040 :      ["mediaextractor"],79    1041 :      ["audioserver"],80    1046 :      ["mediacodec"],81    1047 :      ["cameraserver"],82    1053 :      ["webview zygote"],83    1054 :      ["vehicle hal"],84    1058 :      ["tombstoned"],85    1066 :      ["statsd"],86    1067 :      ["incidentd"],87    9999 :      ["nobody"],88}89class UserActivity(object):90    def __init__(self, uid):91        self.uid = uid92        self.snapshots = []93        self.appPackages = wellKnownUids.get(uid, [])94        self.foregroundWrittenBytes = 095        self.foregroundFsyncCalls = 096        self.backgroundFsyncCalls = 097        self.backgroundWrittenBytes = 098        self.totalWrittenBytes = 099        self.totalFsyncCalls = 0100    def addSnapshot(self, snapshot):101        assert snapshot.uid == self.uid102        self.snapshots.append(snapshot)103        self.foregroundWrittenBytes += snapshot.foregroundWrittenBytes104        self.foregroundFsyncCalls += snapshot.foregroundFsyncCalls105        self.backgroundFsyncCalls += snapshot.backgroundFsyncCalls106        self.backgroundWrittenBytes += snapshot.backgroundWrittenBytes107        self.totalWrittenBytes += snapshot.totalWrittenBytes108        self.totalFsyncCalls += snapshot.totalFsyncCalls109        self.appPackages = merge(self.appPackages, snapshot.appPackages)110    def plot(self, foreground=True, background=True, total=True):111        plt.figure()112        plt.title("I/O activity for UID %s" % (self.uid))113        X = range(0,len(self.snapshots))114        minY = 0115        maxY = 0116        if foreground:117            Y = [s.foregroundWrittenBytes for s in self.snapshots]118            if any([y > 0 for y in Y]):119                plt.plot(X, Y, 'b-')120                minY = min(minY, min(Y))121                maxY = max(maxY, max(Y))122        if background:123            Y = [s.backgroundWrittenBytes for s in self.snapshots]124            if any([y > 0 for y in Y]):125                plt.plot(X, Y, 'g-')126                minY = min(minY, min(Y))127                maxY = max(maxY, max(Y))128        if total:129            Y = [s.totalWrittenBytes for s in self.snapshots]130            if any([y > 0 for y in Y]):131                plt.plot(X, Y, 'r-')132                minY = min(minY, min(Y))133                maxY = max(maxY, max(Y))134        i = int((maxY - minY) / 5)135        Yt = list(range(minY, maxY, i))136        Yl = [prettyPrintBytes(y) for y in Yt]137        plt.yticks(Yt, Yl)138        Xt = list(range(0, len(X)))139        plt.xticks(Xt)140class SystemActivity(object):141    def __init__(self):142        self.uids = {}143        self.snapshots = []144        self.foregroundWrittenBytes = 0145        self.foregroundFsyncCalls = 0146        self.backgroundFsyncCalls = 0147        self.backgroundWrittenBytes = 0148        self.totalWrittenBytes = 0149        self.totalFsyncCalls = 0150    def addSnapshot(self, snapshot):151        self.snapshots.append(snapshot)152        self.foregroundWrittenBytes += snapshot.foregroundWrittenBytes153        self.foregroundFsyncCalls += snapshot.foregroundFsyncCalls154        self.backgroundFsyncCalls += snapshot.backgroundFsyncCalls155        self.backgroundWrittenBytes += snapshot.backgroundWrittenBytes156        self.totalWrittenBytes += snapshot.totalWrittenBytes157        self.totalFsyncCalls += snapshot.totalFsyncCalls158        for uid in snapshot.uids:159            if uid not in self.uids: self.uids[uid] = UserActivity(uid)160            self.uids[uid].addSnapshot(snapshot.uids[uid])161    def loadDocument(self, doc):162        for snapshot in doc.snapshots:163            self.addSnapshot(snapshot)164    def sorted(self, f):165        return sorted(self.uids.values(), key=f, reverse=True)166    def pie(self):167        plt.figure()168        plt.title("Total disk writes per UID")169        A = [(K, self.uids[K].totalWrittenBytes) for K in self.uids]170        A = filter(lambda i: i[1] > 0, A)171        A = list(sorted(A, key=lambda i: i[1], reverse=True))172        X = [i[1] for i in A]173        L = [i[0] for i in A]174        plt.pie(X, labels=L, counterclock=False, startangle=90)175parser = argparse.ArgumentParser("Process FlashApp logs into reports")176parser.add_argument("filename")177parser.add_argument("--reportuid", action="append", default=[])178parser.add_argument("--plotuid", action="append", default=[])179parser.add_argument("--totalpie", action="store_true", default=False)180args = parser.parse_args()181class UidFilter(object):182    def __call__(self, uid):183        return False184class UidFilterAcceptAll(UidFilter):185    def __call__(self, uid):186        return True187class UidFilterAcceptSome(UidFilter):188    def __init__(self, uids):189        self.uids = uids190    def __call__(self, uid):191        return uid in self.uids192uidset = set()193plotfilter = None194for uid in args.plotuid:195    if uid == "all":196        plotfilter = UidFilterAcceptAll()197        break198    else:199        uidset.add(int(uid))200if plotfilter is None: plotfilter = UidFilterAcceptSome(uidset)201uidset = set()202reportfilter = None203for uid in args.reportuid:204    if uid == "all":205        reportfilter = UidFilterAcceptAll()206        break207    else:208        uidset.add(int(uid))209if reportfilter is None:210    if len(uidset) == 0:211        reportfilter = UidFilterAcceptAll()212    else:213        reportfilter = UidFilterAcceptSome(uidset)214document = Document(open(args.filename))215print("System runtime: %s\n" % (document.runtime))216system = SystemActivity()217system.loadDocument(document)218print("Total bytes written: %s (of which %s in foreground and %s in background)\n" % (219        prettyPrintBytes(system.totalWrittenBytes),220        prettyPrintBytes(system.foregroundWrittenBytes),221        prettyPrintBytes(system.backgroundWrittenBytes)))222writemost = filter(lambda ua: ua.totalWrittenBytes > 0, system.sorted(lambda ua: ua.totalWrittenBytes))223for entry in writemost:224    if reportfilter(entry.uid):225        print("user id %d (%s) wrote %s (of which %s in foreground and %s in background)" % (226            entry.uid,227            ','.join(entry.appPackages),228            prettyPrintBytes(entry.totalWrittenBytes),229            prettyPrintBytes(entry.foregroundWrittenBytes),230            prettyPrintBytes(entry.backgroundWrittenBytes)))231    if plotfilter(entry.uid):232        entry.plot()233        plt.show()234if args.totalpie:235    system.pie()...compress.py
Source:compress.py  
1#########################################################################2# Compresses LZSS data for PS1 game "Racing Lagoon" into bytes format   #3#########################################################################45# Racing Lagoon uses references with 11-bit offsets and 4-bit lengths,6# corresponding to a 2048-byte window, and reference lengths in the range7# 3..18 bytes. With a 1 bit identifier for references.89import sys    10import os11from pathlib import Path1213WSIZE = 0b100000000000   # window size1415MAX_REF_LEN        = 18  # maximum copy length16MIN_REF_LEN        = 3   # minimum copy length1718REPEATER_ID        = 0x7E19DOUBLE_REPEATER_ID = 0x7F2021LENGTH_MASK               = 0b0111100022OFFSET_MASK_1             = 0b0000011123REFERENCE_MASK_DOUBLE     = 0b1000000000000000242526#Compresses the given file and returns a bytes string of the compressed file27def compressChunk(inputData):28 29  writeBuffer = b''3031  #write header -  [4 BYTE LITTLE ENDIAN UNCOMPRESSED FILESIZE] 32  #                [0x0173]  33  fileSize = len(inputData)34 35  writeBuffer += bytes([ fileSize &       0xFF])36  writeBuffer += bytes([(fileSize &     0xFF00)>>8])37  writeBuffer += bytes([(fileSize &   0xFF0000)>>16])38  writeBuffer += bytes([(fileSize & 0xFF000000)>>24])39  writeBuffer += b'\x01\x73'4041  #write compressed blocks until file size reached42  43  bytesWritten = 044  windowLeft = 045  windowRight = 04647  inLiteralBlock = False48  literalBlockSize = 04950  while bytesWritten < fileSize:51    referenceFound = False52    53    #######Test for reference54    #check all valid upcoming string lengths for matches in our dictionary window55    if referenceFound == False:56      for copyLength in range(MAX_REF_LEN, MIN_REF_LEN - 1, -1):57        58        #Check if search window extends past EOF59        if bytesWritten + copyLength >= fileSize:60          continue61        else:62          windowRight = bytesWritten + copyLength6364        searchString = inputData[bytesWritten:windowRight]6566        #Search valid window for result67        searchResult = inputData.find(searchString, windowLeft, windowRight - 1)68        if searchResult != -1:69          70          #Truncate and append literal block chain, if exists71          if inLiteralBlock == True:72            writeBuffer += bytes([literalBlockSize])73            writeBuffer += inputData[bytesWritten - literalBlockSize: bytesWritten]74            75            inLiteralBlock = False76            literalBlockSize = 0777879          #create reference block with pattern 0b1YYYYZZZZZZZZZZZ (Y is length-3, Z is reference offset-1)80          lengthNibble = (copyLength-3) << 1181          offsetNibble = bytesWritten - searchResult - 182          83          referenceBlock = REFERENCE_MASK_DOUBLE | lengthNibble | offsetNibble8485          writeBuffer +=  bytes([(referenceBlock & 0xFF00)>>8]) + bytes([referenceBlock & 0xFF])86          referenceFound = True8788          #89          bytesWritten += copyLength90          91          break92  93    #######Test for repeated chars of length94    if referenceFound == False:9596      #If at least next 4 bytes are identical, write repeater block97      if bytesWritten + 4 < fileSize and inputData[bytesWritten] == inputData[bytesWritten+1] == inputData[bytesWritten + 2] == inputData[bytesWritten+3]:98        #Truncate and append literal block chain, if exists99        if inLiteralBlock == True:100          writeBuffer += bytes([literalBlockSize])101          writeBuffer += inputData[bytesWritten - literalBlockSize: bytesWritten]102          103          inLiteralBlock = False104          literalBlockSize = 0105106        count = 4107108        while len(inputData) > (bytesWritten + count) and inputData[bytesWritten + count] == inputData[bytesWritten + count - 1]:109          count+=1110111        #Max length = 0xFFFF112        count = count % 0xFFFF113        114        #Double repeater block used if count is greater than 0xFF+0x04  115        if count <=0x103:116          writeBuffer += bytes([REPEATER_ID])117          writeBuffer += bytes([count-4])118        else:119          writeBuffer += bytes([DOUBLE_REPEATER_ID])120          writeBuffer += count.to_bytes(2, byteorder="little")121        122        writeBuffer += bytes([inputData[bytesWritten]])123124        referenceFound = True125        bytesWritten += count126127        128      129    #######write literal block130    if referenceFound == False:131      132      literalBlockSize += 1133      bytesWritten +=1134      inLiteralBlock = True135136      #If max literal block size reached, truncate and write block137      if literalBlockSize == REPEATER_ID - 1:138        writeBuffer += bytes([literalBlockSize])139        writeBuffer += inputData[bytesWritten - literalBlockSize: bytesWritten]140        141        inLiteralBlock = False142        literalBlockSize = 0143  144145    #update window146    if bytesWritten > WSIZE:147      windowLeft = bytesWritten - WSIZE148149  #Truncate and append literal block chain, if exists150  if inLiteralBlock == True:151    writeBuffer += bytes([literalBlockSize])152    writeBuffer += inputData[bytesWritten - literalBlockSize: bytesWritten]153154  return writeBuffer155156157158#inputfile = open("LZSSOUTRight.bin", "rb")159160#data = compressChunk(inputfile.read())161162#outputFile = open("LZSSTestOut.bin", "wb")163164#outputFile.write(data)165166#outputFile.close()
...test_lammpsdata_write.py
Source:test_lammpsdata_write.py  
1"""2Create an atoms object and write it to a lammps data file3"""4from io import StringIO5import ase.io6from .parse_lammps_data_file import lammpsdata_file_extracted_sections7from .comparison import compare_with_pytest_approx8# Relative tolerance for comparing floats with pytest.approx9REL_TOL = 1e-210def test_lammpsdata_write(atoms):11    # Write atoms object to lammpsdata file-like object12    lammpsdata_buf = StringIO()13    ase.io.write(14        lammpsdata_buf, atoms, format="lammps-data", atom_style="full", velocities=True15    )16    # Now read the output back, parse it, and compare to the original atoms17    # object attributes18    written_values = lammpsdata_file_extracted_sections(lammpsdata_buf)19    # Check cell was written correctly20    cell_written = written_values["cell"]21    cell_expected = atoms.get_cell()22    compare_with_pytest_approx(cell_written, cell_expected, REL_TOL)23    # Check that positions were written correctly24    positions_written = written_values["positions"]25    positions_expected = atoms.get_positions(wrap=True)26    compare_with_pytest_approx(positions_written, positions_expected, REL_TOL)27    # Check velocities were read in correctly28    velocities_written = written_values["velocities"]29    velocities_expected = atoms.get_velocities()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
