Best Python code snippet using assertpy_python
smb_deprecated.py
Source:smb_deprecated.py  
1#!/usr/bin/env python2##ImmunityHeader v13################################################################################4## File       :  smb_deprecated.py5## Description:6##            :7## Created_On :  Wed Apr 11 19:19:16 CEST 20188## Created_By :  X.9##10## (c) Copyright 2010, Immunity, Inc. all rights reserved.11################################################################################12"""13Note for developpers:14---------------------15This is a temporary file why we are transitionning as libsmb is being rewritten.16"""17from __future__ import with_statement18import os19import copy20import sys21import socket22import random23import logging24from struct import pack, unpack, calcsize25if '.' not in sys.path:26    sys.path.append('.')27from libs.newsmb.smbconst import *28from libs.newsmb.Struct import Struct29###30# Temporary copy (libsmb.py).31##32def extractNullTerminatedString(data, index=0, is_unicode=False):33    """34    Extracts a null-terminated string (incl. null character) from an SMB data35    packet. String can be OEM or Unicode.36    Return (extracted string, number of bytes processed).37    """38    null = u'\0'.encode('UTF-16LE') if is_unicode else u'\0'.encode('ASCII')39    size = len(null)40    result = ''41    for i in range(index, len(data), size):42        c = data[i:i + size]43        result += c44        if c == null:45            break46    size += len(result) - len(null)47    return (result.decode('UTF-16LE') if is_unicode else result.decode('ASCII'),48            size)49###50# OLD deprecated classes.51##52class SMBNegotiateRequestOld(Struct):53    st = [54        ['WordCount' , '<B', 0],55        ['ByteCount' , '<H', 0],56        ['Dialects'  , '0s', ''],57    ]58    def __init__(self, data=None, is_unicode=False):59        Struct.__init__(self, data)60        if data is not None:61            pos = self.calcsize()62            self['Dialects'] = data[pos:]63    def pack(self):64        dialects = self['Dialects']65        if dialects == '':66            for d in [ 'PC NETWORK PROGRAM 1.0', 'LANMAN1.0',67                       'Windows for Workgroups 3.1a', 'LM1.2X002', 'LANMAN2.1',68                       'NT LM 0.12' ]:69                # Always ascii and always null-terminated70                dialects += '\x02' + d + '\0'71        self['ByteCount'] = len(dialects)72        return Struct.pack(self) + dialects73class SMBNegotiateResponseOld(Struct):74    st = [75        ['WordCount'           , '<B', 17],76        ['DialectIndex'        , '<H', 0],77        ['SecurityMode'        , '<B', 0],78        ['MaxMpxCount'         , '<H', 0],79        ['MaxCountVCs'         , '<H', 0],80        ['MaxBufferSize'       , '<L', 0],81        ['MaxRawSize'          , '<L', 0],82        ['SessionKey'          , '<L', 0],83        ['Capabilities'        , '<L', 0],84        ['SystemTimeLow'       , '<L', 0],85        ['SystemTimeHigh'      , '<L', 0],86        ['ServerTimeZone'      , '<H', 0],87        ['EncryptionKeyLength' , '<B', 0],88        ['ByteCount'           , '<H', 0],89        ['EncryptionKey'       , '0s', ''],  # Only exists if SMB_FLAGS2_EXTENDED_SECURITY is not set90        ['DomainName'          , '0s', u''], # Only exists if SMB_FLAGS2_EXTENDED_SECURITY is not set91        ['ServerName'          , '0s', u''], # Only exists of SMB_FLAGS2_EXTENDED_SECURITY is not set92        ['ServerGuid'          , '0s', ''],  # Only exists if SMB_FLAGS2_EXTENDED_SECURITY is set93        ['SecurityBlob'        , '0s', ''],  # Only exists if SMB_FLAGS2_EXTENDED_SECURITY is set94    ]95    def __init__(self, data=None, is_unicode=False):96        Struct.__init__(self, data)97        if data is not None:98            # Unicode has not been negotiated yet99            if self['Capabilities'] & CAP_UNICODE:100                is_unicode = True101            pos = self.calcsize()102            if self['Capabilities'] & CAP_EXTENDED_SECURITY:103                self['ServerGuid'] = data[pos:pos + 16]104                self['SecurityBlob'] += data[pos+16:]105            else:106                self['EncryptionKey'] = data[pos:pos + self['EncryptionKeyLength']]107                pos += self['EncryptionKeyLength']108                # Must be null-terminated109                domain, length = extractNullTerminatedString(data, pos, is_unicode)110                self['DomainName'] = domain.split(u'\0')[0]111                # This is optional112                if self['ByteCount'] - self['EncryptionKeyLength'] - length  > 0:113                    servername = extractNullTerminatedString(data, pos+length, is_unicode)[0]114                    self['ServerName'] = servername.split(u'\0')[0]115    def pack(self):116        self['EncryptionKeyLength'] = len(self['EncryptionKey'])117        if self['Capabilities'] & CAP_EXTENDED_SECURITY:118            self['ByteCount'] = self['EncryptionKeyLength'] + len(self['ServerGuid']) + len(self['SecurityBlob'])119            return Struct.pack(self) + self['EncryptionKey'] + self['ServerGuid'] + self['SecurityBlob']120        else:121            if self['Capabilities'] & CAP_UNICODE:122                is_unicode = True123            else:124                is_unicode = False125            # Null terminate fields126            domainname = self['DomainName'] + u'\0'127            servername = self['ServerName'] + u'\0'128            domainname = domainname.encode('UTF-16LE') if is_unicode else domainname.encode('ASCII')129            servername = servername.encode('UTF-16LE') if is_unicode else servername.encode('ASCII')130            self['ByteCount'] = self['EncryptionKeyLength'] + len(domainname) + len(servername)131            return Struct.pack(self) + domainname + servername132class SMBTransactionRequestOld(Struct):133    st = [134        ['WordCount'           , '<B', 14], #14+SetupCount135        ['TotalParameterCount' , '<H', 0],136        ['TotalDataCount'      , '<H', 0],137        ['MaxParameterCount'   , '<H', 0],138        ['MaxDataCount'        , '<H', 0x400],139        ['MaxSetupCount'       , '<B', 0],140        ['Reserved'            , '<B', 0],141        ['Flags'               , '<H', 0],142        ['Timeout'             , '<L', 0],143        ['Reserved2'           , '<H', 0],144        ['ParameterCount'      , '<H', 0],145        ['ParameterOffset'     , '<H', 0],146        ['DataCount'           , '<H', 0],147        ['DataOffset'          , '<H', 0],148        ['SetupCount'          , '<B', 0],149        ['Reserved3'           , '<B', 0],150        ['Setup'               , '0s', ''],151        ['ByteCount'           , '0s', ''],152        ['Name'                , '0s', u''],153        ['Pad'                 , '0s', ''], #Pad to SHORT or LONG154        ['Parameters'          , '0s', ''],155        ['Pad1'                , '0s', ''], #Pad to SHORT or LONG156        ['Data'                , '0s', ''],157    ]158    def __init__(self, data=None, is_unicode=False):159        Struct.__init__(self, data)160        self.is_unicode = is_unicode161        if data is not None:162            pos = self.calcsize()163            size = self['SetupCount'] * calcsize('<H')164            self['Setup'] = data[pos:pos + size]165            pos += size166            size = calcsize('<H')167            self['ByteCount'] = unpack('<H', data[pos:pos + size])[0]168            pos += size169            if is_unicode == True and (pos % 2) == 1:170                pos += 1171            name, size = extractNullTerminatedString(data, pos, is_unicode)172            self['Name'] = name.split(u'\0')[0]173            pos += size174            self['Pad'] = data[pos:self['ParameterOffset'] - SMB_HEADER_SIZE]175            pos = self['ParameterOffset'] - SMB_HEADER_SIZE176            size = self['ParameterCount']177            self['Parameters'] = data[pos:pos + size]178            pos += size179            self['Pad1'] = data[pos:self['DataOffset'] - SMB_HEADER_SIZE]180            pos = self['DataOffset'] - SMB_HEADER_SIZE181            size = self['DataCount']182            self['Data'] = data[pos:pos + size]183    def pack(self):184        self['SetupCount'] = len(self['Setup']) / calcsize('<H')185        self['WordCount'] = 14 + self['SetupCount']186        self['DataCount'] = len(self['Data'])187        if self['TotalDataCount'] == 0:188            self['TotalDataCount'] = self['DataCount']189        self['ParameterCount'] = len(self['Parameters'])190        if self['TotalParameterCount'] == 0:191            self['TotalParameterCount'] = self['ParameterCount']192        size = self.calcsize() + len(self['Setup']) + calcsize('<H')193        name = self['Name']194        name += u'\0'195        if self.is_unicode == True:196            name = name.encode('UTF-16LE')197            if (size % 2) == 1:198                name = '\0' + name199        else:200            name = name.encode('ASCII', 'ignore')201        size += len(name)202        if self['Pad'] == '':203            if (size % 2) == 1:204                self['Pad'] = '\0'205        size += len(self['Pad'])206        self['ParameterOffset'] = SMB_HEADER_SIZE + size207        size += len(self['Parameters'])208        if self['Pad1'] == '':209            if (size % 2) == 1:210                self['Pad1'] = '\0'211        size += len(self['Pad1'])212        self['DataOffset'] = SMB_HEADER_SIZE + size213        data = Struct.pack(self) + self['Setup'] + pack('<H', len(name) + len(self['Pad']) + len(self['Parameters']) + len(self['Pad1']) + len(self['Data'])) + name + self['Pad'] + self['Parameters'] + self['Pad1'] + self['Data']214        return data215class SMBTransactionResponseOld(Struct):216    st = [217        ['WordCount'             , '<B', 10], #10+SetupCount218        ['TotalParameterCount'   , '<H', 0],219        ['TotalDataCount'        , '<H', 0],220        ['Reserved'              , '<H', 0],221        ['ParameterCount'        , '<H', 0],222        ['ParameterOffset'       , '<H', 0],223        ['ParameterDisplacement' , '<H', 0],224        ['DataCount'             , '<H', 0],225        ['DataOffset'            , '<H', 0],226        ['DataDisplacement'      , '<H', 0],227        ['SetupCount'            , '<B', 0],228        ['Reserved2'             , '<B', 0],229        ['Setup'                 , '0s', ''],230        ['ByteCount'             , '0s', ''],231        ['Pad'                   , '0s', ''], #Pad to SHORT or LONG232        ['Parameters'            , '0s', ''],233        ['Pad1'                  , '0s', ''], #Pad to SHORT or LONG234        ['Data'                  , '0s', ''],235    ]236    def __init__(self, data=None, is_unicode=False):237        Struct.__init__(self, data)238        self.is_unicode = is_unicode239        if data is not None:240            pos = self.calcsize()241            size = self['SetupCount'] * calcsize('<H')242            self['Setup'] = data[pos:pos + size]243            pos += size244            size = calcsize('<H')245            self['ByteCount'] = unpack('<H', data[pos:pos + size])[0]246            pos += size247            self['Pad'] = data[pos:self['ParameterOffset'] - SMB_HEADER_SIZE]248            pos = self['ParameterOffset'] - SMB_HEADER_SIZE249            size = self['ParameterCount']250            self['Parameters'] = data[pos:pos + size]251            pos += size252            self['Pad1'] = data[pos:self['DataOffset'] - SMB_HEADER_SIZE]253            pos = self['DataOffset'] - SMB_HEADER_SIZE254            size = self['DataCount']255            self['Data'] = data[pos:pos + size]256    def pack(self):257        self['SetupCount'] = len(self['Setup']) / calcsize('<H')258        self['WordCount'] = 10 + self['SetupCount']259        self['DataCount'] = len(self['Data'])260        if self['TotalDataCount'] == 0: #XXX: If we ever want to split SMB_COM_TRANSACTION* packets, the TotalDataCount will be != DataCount --Kostya261            self['TotalDataCount'] = self['DataCount']262        self['ParameterCount'] = len(self['Parameters'])263        if self['TotalParameterCount'] == 0:264            self['TotalParameterCount'] = self['ParameterCount']265        size = self.calcsize() + len(self['Setup']) + calcsize('<H')266        if self['Pad'] == '':267            if (size % 2) == 1:268                self['Pad'] = '\0'269        size += len(self['Pad'])270        self['ParameterOffset'] = SMB_HEADER_SIZE + size271        size += len(self['Parameters'])272        if self['Pad1'] == '':273            if (size % 2) == 1:274                self['Pad1'] = '\0'275        size += len(self['Pad1'])276        self['DataOffset'] = SMB_HEADER_SIZE + size277        data = Struct.pack(self) + self['Setup'] + pack('<H', len(self['Pad']) + len(self['Parameters']) + len(self['Pad1']) + len(self['Data'])) + self['Pad'] + self['Parameters'] + self['Pad1'] + self['Data']278        return data279class SMBSessionSetupAndXRequestOld(Struct):280    st = [281        ['WordCount'          , '<B', 12],282        ['AndXCommand'        , '<B', 0xff],283        ['AndXReserved'       , '<B', 0],284        ['AndXOffset'         , '<H', 0],285        ['MaxBufferSize'      , '<H', 0x1104],286        ['MaxMpxCount'        , '<H', 0x10],287        ['VcNumber'           , '<H', 0],288        ['SessionKey'         , '<L', 0],289        ['SecurityBlobLength' , '<H', 0],290        ['Reserved'           , '<L', 0],291        ['Capabilities'       , '<L', CAP_EXTENDED_SECURITY|CAP_STATUS32|CAP_UNICODE|CAP_LARGE_READX|CAP_LARGE_WRITEX],292        ['ByteCount'          , '<H', 0],293        ['SecurityBlob'       , '0s', ''],294        ['NativeOS'           , '0s', u''],295        ['NativeLANMan'       , '0s', u''],296        ['PrimaryDomain'      , '0s', u''],297    ]298    def __init__(self, data=None, is_unicode=False):299        Struct.__init__(self, data)300        self.is_unicode = is_unicode301        if data is not None:302            pos = self.calcsize()303            self['SecurityBlob'] = data[pos:pos + self['SecurityBlobLength']]304            pos += self['SecurityBlobLength']305            if self.is_unicode == True and (pos % 2) == 1:306                pos += 1307            # The following strings are always null terminated308            nativeos, size = extractNullTerminatedString(data, pos, is_unicode)309            self['NativeOS'] = nativeos.split(u'\0')[0]310            pos += size311            lanman, _ = extractNullTerminatedString(data, pos, is_unicode)312            self['NativeLANMan'] = lanman.split(u'\0')[0]313    def pack(self):314        self['SecurityBlobLength'] = len(self['SecurityBlob'])315        nativeos = self['NativeOS']316        nativelanman = self['NativeLANMan']317        primarydomain = self['PrimaryDomain']318        if self['NativeOS'] == u'':319            nativeos = u'Unix'320        if self['NativeLANMan'] == u'':321            nativelanman = u'Samba'322        nativeos      += u'\0'323        nativelanman  += u'\0'324        primarydomain += u'\0'325        pad = ''326        if self.is_unicode == True:327            if ((self.calcsize() + self['SecurityBlobLength']) % 2) == 1:328                pad = '\0'329            nativeos = nativeos.encode('UTF-16-LE')330            nativelanman = nativelanman.encode('UTF-16-LE')331            primarydomain = primarydomain.encode('UTF-16-LE')332        else:333            nativeos = nativeos.encode('ASCII')334            nativelanman = nativelanman.encode('ASCII')335            primarydomain = primarydomain.encode('ASCII')336        self['ByteCount'] = len(self['SecurityBlob']) + len(pad) + len(nativeos) + len(nativelanman) + len(primarydomain)337        data = Struct.pack(self)338        return data + self['SecurityBlob'] + pad + nativeos + nativelanman + primarydomain339class SMBSessionSetupAndXResponseOld(Struct):340    st = [341        ['WordCount'         , '<B', 4],342        ['AndXCommand'       , '<B', 0xff],343        ['AndXReserved'      , '<B', 0],344        ['AndXOffset'        , '<H', 0],345        ['Action'            , '<H', 0],346        ['SecurityBlobLength', '<H', 0],347        ['ByteCount'         , '<H', 0],348        ['SecurityBlob'      , '0s', ''],349        ['NativeOS'          , '0s', u''],350        ['NativeLANMan'      , '0s', u''],351        ['PrimaryDomain'     , '0s', u''],352    ]353    def __init__(self, data=None, is_unicode=False):354        Struct.__init__(self, data)355        self.is_unicode = is_unicode356        if data is not None:357            pos = self.calcsize()358            self['SecurityBlob'] = data[pos:pos + self['SecurityBlobLength']]359            pos += self['SecurityBlobLength']360            # NativeOS, NativeLANMan and PrimaryDomain are not very important.361            # Unfortunately parsing this is prone to errors and implementation362            # might vary between servers so catching exceptions is fine.363            try:364                if self.is_unicode == True and (pos % 2) == 1:365                    pos += 1366                nativeos, size = extractNullTerminatedString(data, pos, is_unicode)367                self['NativeOS'] = nativeos.split(u'\0')[0]368                pos += size369                lanman, size = extractNullTerminatedString(data, pos, is_unicode)370                self['NativeLANMan'] = lanman.split(u'\0')[0]371                pos += size372                primarydomain, _ = extractNullTerminatedString(data, pos, is_unicode)373                self['PrimaryDomain'] = primarydomain.split(u'\0')[0]374            except Exception as e:375                logging.warning("Warning, parsing of the answer slightly failed: %s" % str(e))376    def pack(self):377        self['SecurityBlobLength'] = len(self['SecurityBlob'])378        nativeos      = self['NativeOS']379        nativelanman  = self['NativeLANMan']380        primarydomain = self['PrimaryDomain']381        if nativeos == u'':382            nativeos = u'Unix'383        if nativelanman == u'':384            nativelanman = u'Samba'385        nativeos      += u'\0'386        nativelanman  += u'\0'387        primarydomain += u'\0'388        pad = ''389        if self.is_unicode == True:390            if ((self.calcsize() + self['SecurityBlobLength']) % 2) == 1:391                pad = '\0'392            nativeos = nativeos.encode('UTF-16-LE')393            nativelanman = nativelanman.encode('UTF-16-LE')394            primarydomain = primarydomain.encode('UTF-16-LE')395        else:396            nativeos = nativeos.encode('ASCII')397            nativelanman = nativelanman.encode('ASCII')398            primarydomain = primarydomain.encode('ASCII')399        self['ByteCount'] = len(self['SecurityBlob']) + len(pad) + len(nativeos) + len(nativelanman) + len(primarydomain)400        data = Struct.pack(self)401        return data + self['SecurityBlob'] + pad + nativeos + nativelanman + primarydomain402class SMBTreeDisconnectRequestOld(Struct):403    st = [404        ['WordCount' , '<B', 0],405        ['ByteCount' , '<H', 0],406    ]407    def __init__(self, data=None, is_unicode=False):408        Struct.__init__(self, data)409class SMBTreeDisconnectResponseOld(Struct):410    st = [411        ['WordCount' , '<B', 0],412        ['ByteCount' , '<H', 0],413    ]414    def __init__(self, data=None, is_unicode=False):415        Struct.__init__(self, data)416class SMBLogoffAndXRequestOld(Struct):417    st = [418        ['WordCount'    , '<B', 2],419        ['AndXCommand'  , '<B', 0xff],420        ['AndXReserved' , '<B', 0],421        ['AndXOffset'   , '<H', 0],422        ['ByteCount'    , '<H', 0],423    ]424    def __init__(self, data=None, is_unicode=False):425        Struct.__init__(self, data)426class SMBLogoffAndXResponseOld(Struct):427    st = [428        ['WordCount'    , '<B', 2],429        ['AndXCommand'  , '<B', 0xff],430        ['AndXReserved' , '<B', 0],431        ['AndXOffset'   , '<H', 0],432        ['ByteCount'    , '<H', 0],433    ]434    def __init__(self, data=None, is_unicode=False):435        Struct.__init__(self, data)436class SMBNTTransactRequestOld(Struct):437    st = [438        ['WordCount'           , '<B', 19], #19+SetupCount439        ['MaxSetupCount'       , '<B', 0],440        ['Reserved1'           , '<H', 0],441        ['TotalParameterCount' , '<L', 0],442        ['TotalDataCount'      , '<L', 0],443        ['MaxParameterCount'   , '<L', 0],444        ['MaxDataCount'        , '<L', 0],445        ['ParameterCount'      , '<L', 0],446        ['ParameterOffset'     , '<L', 0],447        ['DataCount'           , '<L', 0],448        ['DataOffset'          , '<L', 0],449        ['SetupCount'          , '<B', 0],450        ['Function'            , '<H', 0],451        ['Setup'               , '0s', ''],452        ['ByteCount'           , '0s', ''],453        ['Pad1'                , '0s', ''],454        ['NT_Trans_Parameters' , '0s', ''],455        ['Pad2'                , '0s', ''],456        ['NT_Trans_Data'       , '0s', ''],457    ]458    def __init__(self, data=None, is_unicode=False):459        Struct.__init__(self, data)460        self.is_unicode = is_unicode461        if data is not None:462            pos = self.calcsize()463            size = self['SetupCount'] * calcsize('<H')464            self['Setup'] = data[pos:pos + size]465            pos += size466            size = calcsize('<H')467            self['ByteCount'] = unpack('<H', data[pos:pos + size])[0]468            pos += size469            self['Pad1'] = data[pos:self['ParameterOffset'] - SMB_HEADER_SIZE]470            pos = self['ParameterOffset'] - SMB_HEADER_SIZE471            size = self['ParameterCount']472            self['NT_Trans_Parameters'] = data[pos:pos + size]473            pos += size474            self['Pad2'] = data[pos:self['DataOffset'] - SMB_HEADER_SIZE]475            pos = self['DataOffset'] - SMB_HEADER_SIZE476            size = self['DataCount']477            self['NT_Trans_Data'] = data[pos:pos + size]478    def pack(self):479        self['SetupCount'] = len(self['Setup']) / calcsize('<H')480        self['WordCount'] = 19 + self['SetupCount']481        self['DataCount'] = len(self['NT_Trans_Data'])482        if self['TotalDataCount'] == 0:483            self['TotalDataCount'] = self['DataCount']484        self['ParameterCount'] = len(self['NT_Trans_Parameters'])485        if self['TotalParameterCount'] == 0:486            self['TotalParameterCount'] = self['ParameterCount']487        size = SMB_HEADER_SIZE + self.calcsize() + len(self['Setup']) + calcsize('<H')488        if self['Pad1'] == '':489            if (size % 4) != 0:490                self['Pad1'] = '\0' * (4 - (size % 4))491        size += len(self['Pad1'])492        self['ParameterOffset'] = size493        size += len(self['NT_Trans_Parameters'])494        if self['Pad2'] == '':495            if (size % 4) != 0:496                self['Pad2'] = '\0' * (4 - (size % 4))497        size += len(self['Pad2'])498        self['DataOffset'] = size499        data = Struct.pack(self) + self['Setup'] + pack('<H', len(self['Pad1']) + len(self['NT_Trans_Parameters']) + len(self['Pad2']) + len(self['NT_Trans_Data'])) + self['Pad1'] + self['NT_Trans_Parameters'] + self['Pad2'] + self['NT_Trans_Data']500        return data501class SMBNTTransactResponseOld(Struct):502    st = [503        ['WordCount'             , '<B', 18], #18+SetupCount504        ['Reserved1'             , '3s', '\0' * 3],505        ['TotalParameterCount'   , '<L', 0],506        ['TotalDataCount'        , '<L', 0],507        ['ParameterCount'        , '<L', 0],508        ['ParameterOffset'       , '<L', 0],509        ['ParameterDisplacement' , '<L', 0],510        ['DataCount'             , '<L', 0],511        ['DataOffset'            , '<L', 0],512        ['DataDisplacement'      , '<L', 0],513        ['SetupCount'            , '<B', 0],514        ['Setup'                 , '0s', ''],515        ['ByteCount'             , '0s', ''],516        ['Pad1'                  , '0s', ''],517        ['NT_Trans_Parameters'   , '0s', ''],518        ['Pad2'                  , '0s', ''],519        ['NT_Trans_Data'         , '0s', ''],520    ]521    def __init__(self, data=None, is_unicode=False):522        if data is not None and len(data) < self.calcsize(): #Interim server response523            self['WordCount'] = 0524            return525        Struct.__init__(self, data)526        self.is_unicode = is_unicode527        if data is not None:528            pos = self.calcsize()529            size = self['SetupCount'] * calcsize('<H')530            self['Setup'] = data[pos:pos + size]531            pos += size532            size = calcsize('<H')533            self['ByteCount'] = unpack('<H', data[pos:pos + size])[0]534            pos += size535            self['Pad1'] = data[pos:self['ParameterOffset'] - SMB_HEADER_SIZE]536            pos = self['ParameterOffset'] - SMB_HEADER_SIZE537            size = self['ParameterCount']538            self['NT_Trans_Parameters'] = data[pos:pos + size]539            pos += size540            self['Pad2'] = data[pos:self['DataOffset'] - SMB_HEADER_SIZE]541            pos = self['DataOffset'] - SMB_HEADER_SIZE542            size = self['DataCount']543            self['NT_Trans_Data'] = data[pos:pos + size]544    def pack(self):545        self['SetupCount'] = len(self['Setup']) / calcsize('<H')546        self['WordCount'] = 18 + self['SetupCount']547        self['DataCount'] = len(self['NT_Trans_Data'])548        if self['TotalDataCount'] == 0:549            self['TotalDataCount'] = self['DataCount']550        self['ParameterCount'] = len(self['NT_Trans_Parameters'])551        if self['TotalParameterCount'] == 0:552            self['TotalParameterCount'] = self['ParameterCount']553        size = SMB_HEADER_SIZE + self.calcsize() + len(self['Setup']) + calcsize('<H')554        if self['Pad1'] == '':555            if (size % 4) != 0:556                self['Pad1'] = '\0' * (4 - (size % 4))557        size += len(self['Pad1'])558        self['ParameterOffset'] = size559        size += len(self['NT_Trans_Parameters'])560        if self['Pad2'] == '':561            if (size % 4) != 0:562                self['Pad2'] = '\0' * (4 - (size % 4))563        size += len(self['Pad2'])564        self['DataOffset'] = size565        data = Struct.pack(self) + self['Setup'] + pack('<H', len(self['Pad1']) + len(self['NT_Trans_Parameters']) + len(self['Pad2']) + len(self['NT_Trans_Data'])) + self['Pad1'] + self['NT_Trans_Parameters'] + self['Pad2'] + self['NT_Trans_Data']566        return data567class SMBTransactionSecondaryRequestOld(Struct):568    st = [569        ['WordCount'             , '<B', 8],570        ['TotalParameterCount'   , '<H', 0],571        ['TotalDataCount'        , '<H', 0],572        ['ParameterCount'        , '<H', 0],573        ['ParameterOffset'       , '<H', 0],574        ['ParameterDisplacement' , '<H', 0],575        ['DataCount'             , '<H', 0],576        ['DataOffset'            , '<H', 0],577        ['DataDisplacement'      , '<H', 0],578        ['ByteCount'             , '<H', 0],579        ['Pad1'                  , '0s', ''],580        ['Trans_Parameters'      , '0s', ''],581        ['Pad2'                  , '0s', ''],582        ['Trans_Data'            , '0s', ''],583    ]584    def __init__(self, data=None, is_unicode=False):585        Struct.__init__(self, data)586        self.is_unicode = is_unicode587        if data is not None:588            pos = self.calcsize()589            self['Pad1'] = data[pos:self['ParameterOffset'] - SMB_HEADER_SIZE]590            pos = self['ParameterOffset'] - SMB_HEADER_SIZE591            size = self['ParameterCount']592            self['Trans_Parameters'] = data[pos:pos + size]593            pos += size594            self['Pad2'] = data[pos:self['DataOffset'] - SMB_HEADER_SIZE]595            pos = self['DataOffset'] - SMB_HEADER_SIZE596            size = self['DataCount']597            self['Trans_Data'] = data[pos:pos + size]598    def pack(self):599        self['DataCount'] = len(self['Trans_Data'])600        if self['TotalDataCount'] == 0:601            self['TotalDataCount'] = self['DataCount']602        self['ParameterCount'] = len(self['Trans_Parameters'])603        if self['TotalParameterCount'] == 0:604            self['TotalParameterCount'] = self['ParameterCount']605        size = SMB_HEADER_SIZE + self.calcsize()606        if self['Pad1'] == '':607            if (size % 4) != 0:608                self['Pad1'] = '\0' * (4 - (size % 4))609        size += len(self['Pad1'])610        self['ParameterOffset'] = size611        size += len(self['Trans_Parameters'])612        if self['Pad2'] == '':613            if (size % 4) != 0:614                self['Pad2'] = '\0' * (4 - (size % 4))615        size += len(self['Pad2'])616        self['DataOffset'] = size617        self['ByteCount'] = len(self['Pad1']) + len(self['Trans_Parameters']) + len(self['Pad2']) + len(self['Trans_Data'])618        data = Struct.pack(self)619        data += self['Pad1'] + self['Trans_Parameters'] + self['Pad2'] + self['Trans_Data']620        return data621class SMBNTTransactSecondaryRequestOld(Struct):622    st = [623        ['WordCount'             , '<B', 18],624        ['Reserved1'             , '3s', '\0' * 3],625        ['TotalParameterCount'   , '<L', 0],626        ['TotalDataCount'        , '<L', 0],627        ['ParameterCount'        , '<L', 0],628        ['ParameterOffset'       , '<L', 0],629        ['ParameterDisplacement' , '<L', 0],630        ['DataCount'             , '<L', 0],631        ['DataOffset'            , '<L', 0],632        ['DataDisplacement'      , '<L', 0],633        ['Reserved2'             , '<B', 0],634        ['ByteCount'             , '<H', 0],635        ['Pad1'                  , '0s', ''],636        ['NT_Trans_Parameters'   , '0s', ''],637        ['Pad2'                  , '0s', ''],638        ['NT_Trans_Data'         , '0s', ''],639    ]640    def __init__(self, data=None, is_unicode=False):641        Struct.__init__(self, data)642        self.is_unicode = is_unicode643        if data is not None:644            pos = self.calcsize()645            self['Pad1'] = data[pos:self['ParameterOffset'] - SMB_HEADER_SIZE]646            pos = self['ParameterOffset'] - SMB_HEADER_SIZE647            size = self['ParameterCount']648            self['NT_Trans_Parameters'] = data[pos:pos + size]649            pos += size650            self['Pad2'] = data[pos:self['DataOffset'] - SMB_HEADER_SIZE]651            pos = self['DataOffset'] - SMB_HEADER_SIZE652            size = self['DataCount']653            self['NT_Trans_Data'] = data[pos:pos + size]654    def pack(self):655        self['DataCount'] = len(self['NT_Trans_Data'])656        if self['TotalDataCount'] == 0:657            self['TotalDataCount'] = self['DataCount']658        self['ParameterCount'] = len(self['NT_Trans_Parameters'])659        if self['TotalParameterCount'] == 0:660            self['TotalParameterCount'] = self['ParameterCount']661        size = SMB_HEADER_SIZE + self.calcsize()662        if self['Pad1'] == '':663            if (size % 4) != 0:664                self['Pad1'] = '\0' * (4 - (size % 4))665        size += len(self['Pad1'])666        self['ParameterOffset'] = size667        size += len(self['NT_Trans_Parameters'])668        if self['Pad2'] == '':669            if (size % 4) != 0:670                self['Pad2'] = '\0' * (4 - (size % 4))671        size += len(self['Pad2'])672        self['DataOffset'] = size673        self['ByteCount'] = len(self['Pad1']) + len(self['NT_Trans_Parameters']) + len(self['Pad2']) + len(self['NT_Trans_Data'])674        data = Struct.pack(self)675        data += self['Pad1'] + self['NT_Trans_Parameters'] + self['Pad2'] + self['NT_Trans_Data']676        return data677class SMBEchoRequestOld(Struct):678    st = [679        ['WordCount' , '<B', 0x01],680        ['EchoCount' , '<H', 0x01],681        ['ByteCount' , '<H', 0],682        ['EchoData'  , '0s', ''],683    ]684    def __init__(self, data=None, is_unicode=False):685        Struct.__init__(self, data)686        if data is not None:687            pos = self.calcsize()688            self['EchoData'] = data[pos:]689    def pack(self):690        self['ByteCount'] = len(self['EchoData'])691        return Struct.pack(self) + self['EchoData']692class SMBEchoResponseOld(Struct):693    st = [694        ['WordCount'      , '<B', 0x01],695        ['SequenceNumber' , '<H', 0],696        ['ByteCount'      , '<H', 0],697        ['EchoData'       , '0s', ''],698    ]699    def __init__(self, data=None, is_unicode=False):700        Struct.__init__(self, data)701        if data is not None:702            pos = self.calcsize()703            self['EchoData'] = data[pos:]704    def pack(self):705        self['ByteCount'] = len(self['EchoData'])...utils_preprocess.py
Source:utils_preprocess.py  
1# -*- coding: utf-8 -*-2#pylint: skip-file3import sys4import os5import re6import string7from nltk import tokenize8def extract_chinese_clause(doc_str, num_split):9    sentences = []10    if doc_str != u"":11        tmp = re.split(ur"[ãï¼ï¼]+", doc_str)12        for e in tmp:13            if e != u"":14                sentences.append(e)15    len_doc = 016    clauses = []17    for doc in sentences:18        if doc != u"":19            tmp = re.split(ur"[ï¼ï¼]+", doc)20            for e in tmp:21                len_doc += len(e)22            clauses += tmp23    24    len_clause = len_doc / num_split25    result = []26    s = []27    len_s = 028    for cl in clauses[:-1]:29        if len_s > len_clause:30            result.append(u"".join(s))31            s = []32            len_s = 033        s.append(cl)34        len_s += len(cl)35    if len_s != 0:36        result.append(u"".join(s))37    if len(result) < num_split:38        result.append(clauses[-1])39    else:40        result[-1] += clauses[-1]41    return result42def read_news(src_path, has_y, is_unicode):43    abstract = ""44    contents = []45    with open(src_path, "r") as f_src:46        if has_y:47            if is_unicode:48                abstract = f_src.readline().strip().decode("utf-8")49            else:50                abstract = f_src.readline().strip()51        while 1:52            line = f_src.readline()53            if line == "\n":54                break55            if is_unicode:56                line = line.strip().decode("utf-8")57            else:58                line = line.strip()59            contents.append(line)60    return (abstract, contents)61def read_info(src_path, has_abstract, is_unicode):62    abstract = None63    with open(src_path, "r") as f_src:64        if has_abstract:65            line = f_src.readline()66            if not line:67                return None68            abstract = line.decode("utf-8").split() if is_unicode else line.split()69        contents = []70        '''71        if has_feature:72            sim = []73            cw = []74            en = []75            pos = []76        '''77        while 1:78            line = f_src.readline()79            if not line:80                break81            line = line.strip('\n')82            if line == "":83                continue84            if is_unicode:85                line = line.decode("utf-8")86    87            contents.append(line.split())88            '''89            if has_feature:90                sim.append(float(f_src.readline().strip()))91                cw.append(float(f_src.readline().strip()))92                en.append(float(f_src.readline().strip()))93                pos.append(int(f_src.readline().strip()))94            '''95    #feature = (sim, cw, en, pos) if has_feature else None96    return abstract, contents#, feature97class washer(object):98    def __init__(self, stopwords_path, is_unicode, x_num_words, y_num_words, x_num_sents):99        self.stopwords = set()100        self.is_unicode = is_unicode101        if stopwords_path != None:102            with open(stopwords_path, "r") as f_stop:103                for line in f_stop:104                    line = line.strip()105                    if is_unicode:106                        line = line.decode("utf-8")107                    self.stopwords.add(line)108        109        self.white_str = u"" if is_unicode else ""110        self.space = ur"[\s]+"111        if is_unicode:112            self.chinese_punc = ur"[ããã·ï¼ããããââããâ¦â¦ï¿¥#ï¼ï¼ââ]+"113            self.punc_table = dict((ord(char), None) for char in string.punctuation)114            self.digit_table = dict((ord(char), None) for char in string.digits)115        else:116            self.punc_table = string.maketrans("", "")117        self.X_MIN_NUM_WORDS, self.X_MAX_NUM_WORDS = x_num_words118        self.Y_MIN_NUM_WORDS, self.Y_MAX_NUM_WORDS = y_num_words119        self.MIN_NUM_SENTS, self.MAX_NUM_SENTS = x_num_sents120    def wash_word(self, word, delete_stopwords):121        word = re.sub(self.space, self.white_str, word) # delete white space122        if self.is_unicode:123            word = word.translate(self.punc_table)124            word = word.translate(self.digit_table) # delete numbers125            word = re.sub(self.chinese_punc, self.white_str, word)126        else:127            word = word.translate(self.punc_table, string.punctuation) # delete punctuations128            word = word.translate(None, string.digits) # delete numbers129        if delete_stopwords:130            if word in self.stopwords:131                return None132        if word == self.white_str:133            return None134    135        return word136    # sent should be a list137    def wash_abstract(self, sent):138        if len(sent) < self.Y_MIN_NUM_WORDS or len(sent) > self.Y_MAX_NUM_WORDS:139            return None140        141        return sent142    # sent should be a list143    def wash_sent(self, sent):144        if len(sent) < self.X_MIN_NUM_WORDS or len(sent) > self.X_MAX_NUM_WORDS:145            return None146        147        return sent148    def wash_news(self, abstract, contents, delete_stopwords):149        wash_status = 0150        if abstract != None:151            new_abstract = []152            tmp = abstract.lower().split()153            for e in tmp:154                e = self.wash_word(e, delete_stopwords)155                if e != None:156                    new_abstract.append(e)157            new_abstract = self.wash_abstract(new_abstract)158            if new_abstract == None:159                wash_status |= 1160        else:161            new_abstract = None162    163        new_contents = []164        for s in contents:165            new_s = []166            tmp = s.lower().split()167            for e in tmp:168                e = self.wash_word(e, delete_stopwords)169                if e != None:170                    new_s.append(e)171            new_s = self.wash_sent(new_s)172            new_contents.append(new_s)173    174        num_contents = 0175        for e in new_contents:176            if e != None:177                num_contents += 1178                if num_contents > self.MAX_NUM_SENTS:179                    break180        if num_contents < self.MIN_NUM_SENTS:181            new_contents = None182            wash_status |= 2183        if num_contents > self.MAX_NUM_SENTS: # save first MAX_NUM_SENTS sentences184            stop = 0185            i = 0186            while stop < self.MAX_NUM_SENTS:187                if new_contents[i] != None:188                    stop += 1189                i += 1190            new_contents = new_contents[0 : stop]191            192        return (new_abstract, new_contents, wash_status)193def interpolate_space(words, is_unicode):194    space = u" " if is_unicode else " "195    return space.join(words)196def remove_space(string, is_unicode):197    empty = u"" if is_unicode else " "198    return empty.join(string.split())199# split a paragraph into sentences (only for English)200def split_paragraph(sent):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
