How to use is_unicode method in assertpy

Best Python code snippet using assertpy_python

smb_deprecated.py

Source:smb_deprecated.py Github

copy

Full Screen

1#!/usr/bin/env python2##ImmunityHeader v13################################################################################4## File : smb_deprecated.py5## Description:6## :7## Created_On : Wed Apr 11 19:19:16 CEST 20188## Created_By : X.9##10## (c) Copyright 2010, Immunity, Inc. all rights reserved.11################################################################################12"""13Note for developpers:14---------------------15This is a temporary file why we are transitionning as libsmb is being rewritten.16"""17from __future__ import with_statement18import os19import copy20import sys21import socket22import random23import logging24from struct import pack, unpack, calcsize25if '.' not in sys.path:26 sys.path.append('.')27from libs.newsmb.smbconst import *28from libs.newsmb.Struct import Struct29###30# Temporary copy (libsmb.py).31##32def extractNullTerminatedString(data, index=0, is_unicode=False):33 """34 Extracts a null-terminated string (incl. null character) from an SMB data35 packet. String can be OEM or Unicode.36 Return (extracted string, number of bytes processed).37 """38 null = u'\0'.encode('UTF-16LE') if is_unicode else u'\0'.encode('ASCII')39 size = len(null)40 result = ''41 for i in range(index, len(data), size):42 c = data[i:i + size]43 result += c44 if c == null:45 break46 size += len(result) - len(null)47 return (result.decode('UTF-16LE') if is_unicode else result.decode('ASCII'),48 size)49###50# OLD deprecated classes.51##52class SMBNegotiateRequestOld(Struct):53 st = [54 ['WordCount' , '<B', 0],55 ['ByteCount' , '<H', 0],56 ['Dialects' , '0s', ''],57 ]58 def __init__(self, data=None, is_unicode=False):59 Struct.__init__(self, data)60 if data is not None:61 pos = self.calcsize()62 self['Dialects'] = data[pos:]63 def pack(self):64 dialects = self['Dialects']65 if dialects == '':66 for d in [ 'PC NETWORK PROGRAM 1.0', 'LANMAN1.0',67 'Windows for Workgroups 3.1a', 'LM1.2X002', 'LANMAN2.1',68 'NT LM 0.12' ]:69 # Always ascii and always null-terminated70 dialects += '\x02' + d + '\0'71 self['ByteCount'] = len(dialects)72 return Struct.pack(self) + dialects73class SMBNegotiateResponseOld(Struct):74 st = [75 ['WordCount' , '<B', 17],76 ['DialectIndex' , '<H', 0],77 ['SecurityMode' , '<B', 0],78 ['MaxMpxCount' , '<H', 0],79 ['MaxCountVCs' , '<H', 0],80 ['MaxBufferSize' , '<L', 0],81 ['MaxRawSize' , '<L', 0],82 ['SessionKey' , '<L', 0],83 ['Capabilities' , '<L', 0],84 ['SystemTimeLow' , '<L', 0],85 ['SystemTimeHigh' , '<L', 0],86 ['ServerTimeZone' , '<H', 0],87 ['EncryptionKeyLength' , '<B', 0],88 ['ByteCount' , '<H', 0],89 ['EncryptionKey' , '0s', ''], # Only exists if SMB_FLAGS2_EXTENDED_SECURITY is not set90 ['DomainName' , '0s', u''], # Only exists if SMB_FLAGS2_EXTENDED_SECURITY is not set91 ['ServerName' , '0s', u''], # Only exists of SMB_FLAGS2_EXTENDED_SECURITY is not set92 ['ServerGuid' , '0s', ''], # Only exists if SMB_FLAGS2_EXTENDED_SECURITY is set93 ['SecurityBlob' , '0s', ''], # Only exists if SMB_FLAGS2_EXTENDED_SECURITY is set94 ]95 def __init__(self, data=None, is_unicode=False):96 Struct.__init__(self, data)97 if data is not None:98 # Unicode has not been negotiated yet99 if self['Capabilities'] & CAP_UNICODE:100 is_unicode = True101 pos = self.calcsize()102 if self['Capabilities'] & CAP_EXTENDED_SECURITY:103 self['ServerGuid'] = data[pos:pos + 16]104 self['SecurityBlob'] += data[pos+16:]105 else:106 self['EncryptionKey'] = data[pos:pos + self['EncryptionKeyLength']]107 pos += self['EncryptionKeyLength']108 # Must be null-terminated109 domain, length = extractNullTerminatedString(data, pos, is_unicode)110 self['DomainName'] = domain.split(u'\0')[0]111 # This is optional112 if self['ByteCount'] - self['EncryptionKeyLength'] - length > 0:113 servername = extractNullTerminatedString(data, pos+length, is_unicode)[0]114 self['ServerName'] = servername.split(u'\0')[0]115 def pack(self):116 self['EncryptionKeyLength'] = len(self['EncryptionKey'])117 if self['Capabilities'] & CAP_EXTENDED_SECURITY:118 self['ByteCount'] = self['EncryptionKeyLength'] + len(self['ServerGuid']) + len(self['SecurityBlob'])119 return Struct.pack(self) + self['EncryptionKey'] + self['ServerGuid'] + self['SecurityBlob']120 else:121 if self['Capabilities'] & CAP_UNICODE:122 is_unicode = True123 else:124 is_unicode = False125 # Null terminate fields126 domainname = self['DomainName'] + u'\0'127 servername = self['ServerName'] + u'\0'128 domainname = domainname.encode('UTF-16LE') if is_unicode else domainname.encode('ASCII')129 servername = servername.encode('UTF-16LE') if is_unicode else servername.encode('ASCII')130 self['ByteCount'] = self['EncryptionKeyLength'] + len(domainname) + len(servername)131 return Struct.pack(self) + domainname + servername132class SMBTransactionRequestOld(Struct):133 st = [134 ['WordCount' , '<B', 14], #14+SetupCount135 ['TotalParameterCount' , '<H', 0],136 ['TotalDataCount' , '<H', 0],137 ['MaxParameterCount' , '<H', 0],138 ['MaxDataCount' , '<H', 0x400],139 ['MaxSetupCount' , '<B', 0],140 ['Reserved' , '<B', 0],141 ['Flags' , '<H', 0],142 ['Timeout' , '<L', 0],143 ['Reserved2' , '<H', 0],144 ['ParameterCount' , '<H', 0],145 ['ParameterOffset' , '<H', 0],146 ['DataCount' , '<H', 0],147 ['DataOffset' , '<H', 0],148 ['SetupCount' , '<B', 0],149 ['Reserved3' , '<B', 0],150 ['Setup' , '0s', ''],151 ['ByteCount' , '0s', ''],152 ['Name' , '0s', u''],153 ['Pad' , '0s', ''], #Pad to SHORT or LONG154 ['Parameters' , '0s', ''],155 ['Pad1' , '0s', ''], #Pad to SHORT or LONG156 ['Data' , '0s', ''],157 ]158 def __init__(self, data=None, is_unicode=False):159 Struct.__init__(self, data)160 self.is_unicode = is_unicode161 if data is not None:162 pos = self.calcsize()163 size = self['SetupCount'] * calcsize('<H')164 self['Setup'] = data[pos:pos + size]165 pos += size166 size = calcsize('<H')167 self['ByteCount'] = unpack('<H', data[pos:pos + size])[0]168 pos += size169 if is_unicode == True and (pos % 2) == 1:170 pos += 1171 name, size = extractNullTerminatedString(data, pos, is_unicode)172 self['Name'] = name.split(u'\0')[0]173 pos += size174 self['Pad'] = data[pos:self['ParameterOffset'] - SMB_HEADER_SIZE]175 pos = self['ParameterOffset'] - SMB_HEADER_SIZE176 size = self['ParameterCount']177 self['Parameters'] = data[pos:pos + size]178 pos += size179 self['Pad1'] = data[pos:self['DataOffset'] - SMB_HEADER_SIZE]180 pos = self['DataOffset'] - SMB_HEADER_SIZE181 size = self['DataCount']182 self['Data'] = data[pos:pos + size]183 def pack(self):184 self['SetupCount'] = len(self['Setup']) / calcsize('<H')185 self['WordCount'] = 14 + self['SetupCount']186 self['DataCount'] = len(self['Data'])187 if self['TotalDataCount'] == 0:188 self['TotalDataCount'] = self['DataCount']189 self['ParameterCount'] = len(self['Parameters'])190 if self['TotalParameterCount'] == 0:191 self['TotalParameterCount'] = self['ParameterCount']192 size = self.calcsize() + len(self['Setup']) + calcsize('<H')193 name = self['Name']194 name += u'\0'195 if self.is_unicode == True:196 name = name.encode('UTF-16LE')197 if (size % 2) == 1:198 name = '\0' + name199 else:200 name = name.encode('ASCII', 'ignore')201 size += len(name)202 if self['Pad'] == '':203 if (size % 2) == 1:204 self['Pad'] = '\0'205 size += len(self['Pad'])206 self['ParameterOffset'] = SMB_HEADER_SIZE + size207 size += len(self['Parameters'])208 if self['Pad1'] == '':209 if (size % 2) == 1:210 self['Pad1'] = '\0'211 size += len(self['Pad1'])212 self['DataOffset'] = SMB_HEADER_SIZE + size213 data = Struct.pack(self) + self['Setup'] + pack('<H', len(name) + len(self['Pad']) + len(self['Parameters']) + len(self['Pad1']) + len(self['Data'])) + name + self['Pad'] + self['Parameters'] + self['Pad1'] + self['Data']214 return data215class SMBTransactionResponseOld(Struct):216 st = [217 ['WordCount' , '<B', 10], #10+SetupCount218 ['TotalParameterCount' , '<H', 0],219 ['TotalDataCount' , '<H', 0],220 ['Reserved' , '<H', 0],221 ['ParameterCount' , '<H', 0],222 ['ParameterOffset' , '<H', 0],223 ['ParameterDisplacement' , '<H', 0],224 ['DataCount' , '<H', 0],225 ['DataOffset' , '<H', 0],226 ['DataDisplacement' , '<H', 0],227 ['SetupCount' , '<B', 0],228 ['Reserved2' , '<B', 0],229 ['Setup' , '0s', ''],230 ['ByteCount' , '0s', ''],231 ['Pad' , '0s', ''], #Pad to SHORT or LONG232 ['Parameters' , '0s', ''],233 ['Pad1' , '0s', ''], #Pad to SHORT or LONG234 ['Data' , '0s', ''],235 ]236 def __init__(self, data=None, is_unicode=False):237 Struct.__init__(self, data)238 self.is_unicode = is_unicode239 if data is not None:240 pos = self.calcsize()241 size = self['SetupCount'] * calcsize('<H')242 self['Setup'] = data[pos:pos + size]243 pos += size244 size = calcsize('<H')245 self['ByteCount'] = unpack('<H', data[pos:pos + size])[0]246 pos += size247 self['Pad'] = data[pos:self['ParameterOffset'] - SMB_HEADER_SIZE]248 pos = self['ParameterOffset'] - SMB_HEADER_SIZE249 size = self['ParameterCount']250 self['Parameters'] = data[pos:pos + size]251 pos += size252 self['Pad1'] = data[pos:self['DataOffset'] - SMB_HEADER_SIZE]253 pos = self['DataOffset'] - SMB_HEADER_SIZE254 size = self['DataCount']255 self['Data'] = data[pos:pos + size]256 def pack(self):257 self['SetupCount'] = len(self['Setup']) / calcsize('<H')258 self['WordCount'] = 10 + self['SetupCount']259 self['DataCount'] = len(self['Data'])260 if self['TotalDataCount'] == 0: #XXX: If we ever want to split SMB_COM_TRANSACTION* packets, the TotalDataCount will be != DataCount --Kostya261 self['TotalDataCount'] = self['DataCount']262 self['ParameterCount'] = len(self['Parameters'])263 if self['TotalParameterCount'] == 0:264 self['TotalParameterCount'] = self['ParameterCount']265 size = self.calcsize() + len(self['Setup']) + calcsize('<H')266 if self['Pad'] == '':267 if (size % 2) == 1:268 self['Pad'] = '\0'269 size += len(self['Pad'])270 self['ParameterOffset'] = SMB_HEADER_SIZE + size271 size += len(self['Parameters'])272 if self['Pad1'] == '':273 if (size % 2) == 1:274 self['Pad1'] = '\0'275 size += len(self['Pad1'])276 self['DataOffset'] = SMB_HEADER_SIZE + size277 data = Struct.pack(self) + self['Setup'] + pack('<H', len(self['Pad']) + len(self['Parameters']) + len(self['Pad1']) + len(self['Data'])) + self['Pad'] + self['Parameters'] + self['Pad1'] + self['Data']278 return data279class SMBSessionSetupAndXRequestOld(Struct):280 st = [281 ['WordCount' , '<B', 12],282 ['AndXCommand' , '<B', 0xff],283 ['AndXReserved' , '<B', 0],284 ['AndXOffset' , '<H', 0],285 ['MaxBufferSize' , '<H', 0x1104],286 ['MaxMpxCount' , '<H', 0x10],287 ['VcNumber' , '<H', 0],288 ['SessionKey' , '<L', 0],289 ['SecurityBlobLength' , '<H', 0],290 ['Reserved' , '<L', 0],291 ['Capabilities' , '<L', CAP_EXTENDED_SECURITY|CAP_STATUS32|CAP_UNICODE|CAP_LARGE_READX|CAP_LARGE_WRITEX],292 ['ByteCount' , '<H', 0],293 ['SecurityBlob' , '0s', ''],294 ['NativeOS' , '0s', u''],295 ['NativeLANMan' , '0s', u''],296 ['PrimaryDomain' , '0s', u''],297 ]298 def __init__(self, data=None, is_unicode=False):299 Struct.__init__(self, data)300 self.is_unicode = is_unicode301 if data is not None:302 pos = self.calcsize()303 self['SecurityBlob'] = data[pos:pos + self['SecurityBlobLength']]304 pos += self['SecurityBlobLength']305 if self.is_unicode == True and (pos % 2) == 1:306 pos += 1307 # The following strings are always null terminated308 nativeos, size = extractNullTerminatedString(data, pos, is_unicode)309 self['NativeOS'] = nativeos.split(u'\0')[0]310 pos += size311 lanman, _ = extractNullTerminatedString(data, pos, is_unicode)312 self['NativeLANMan'] = lanman.split(u'\0')[0]313 def pack(self):314 self['SecurityBlobLength'] = len(self['SecurityBlob'])315 nativeos = self['NativeOS']316 nativelanman = self['NativeLANMan']317 primarydomain = self['PrimaryDomain']318 if self['NativeOS'] == u'':319 nativeos = u'Unix'320 if self['NativeLANMan'] == u'':321 nativelanman = u'Samba'322 nativeos += u'\0'323 nativelanman += u'\0'324 primarydomain += u'\0'325 pad = ''326 if self.is_unicode == True:327 if ((self.calcsize() + self['SecurityBlobLength']) % 2) == 1:328 pad = '\0'329 nativeos = nativeos.encode('UTF-16-LE')330 nativelanman = nativelanman.encode('UTF-16-LE')331 primarydomain = primarydomain.encode('UTF-16-LE')332 else:333 nativeos = nativeos.encode('ASCII')334 nativelanman = nativelanman.encode('ASCII')335 primarydomain = primarydomain.encode('ASCII')336 self['ByteCount'] = len(self['SecurityBlob']) + len(pad) + len(nativeos) + len(nativelanman) + len(primarydomain)337 data = Struct.pack(self)338 return data + self['SecurityBlob'] + pad + nativeos + nativelanman + primarydomain339class SMBSessionSetupAndXResponseOld(Struct):340 st = [341 ['WordCount' , '<B', 4],342 ['AndXCommand' , '<B', 0xff],343 ['AndXReserved' , '<B', 0],344 ['AndXOffset' , '<H', 0],345 ['Action' , '<H', 0],346 ['SecurityBlobLength', '<H', 0],347 ['ByteCount' , '<H', 0],348 ['SecurityBlob' , '0s', ''],349 ['NativeOS' , '0s', u''],350 ['NativeLANMan' , '0s', u''],351 ['PrimaryDomain' , '0s', u''],352 ]353 def __init__(self, data=None, is_unicode=False):354 Struct.__init__(self, data)355 self.is_unicode = is_unicode356 if data is not None:357 pos = self.calcsize()358 self['SecurityBlob'] = data[pos:pos + self['SecurityBlobLength']]359 pos += self['SecurityBlobLength']360 # NativeOS, NativeLANMan and PrimaryDomain are not very important.361 # Unfortunately parsing this is prone to errors and implementation362 # might vary between servers so catching exceptions is fine.363 try:364 if self.is_unicode == True and (pos % 2) == 1:365 pos += 1366 nativeos, size = extractNullTerminatedString(data, pos, is_unicode)367 self['NativeOS'] = nativeos.split(u'\0')[0]368 pos += size369 lanman, size = extractNullTerminatedString(data, pos, is_unicode)370 self['NativeLANMan'] = lanman.split(u'\0')[0]371 pos += size372 primarydomain, _ = extractNullTerminatedString(data, pos, is_unicode)373 self['PrimaryDomain'] = primarydomain.split(u'\0')[0]374 except Exception as e:375 logging.warning("Warning, parsing of the answer slightly failed: %s" % str(e))376 def pack(self):377 self['SecurityBlobLength'] = len(self['SecurityBlob'])378 nativeos = self['NativeOS']379 nativelanman = self['NativeLANMan']380 primarydomain = self['PrimaryDomain']381 if nativeos == u'':382 nativeos = u'Unix'383 if nativelanman == u'':384 nativelanman = u'Samba'385 nativeos += u'\0'386 nativelanman += u'\0'387 primarydomain += u'\0'388 pad = ''389 if self.is_unicode == True:390 if ((self.calcsize() + self['SecurityBlobLength']) % 2) == 1:391 pad = '\0'392 nativeos = nativeos.encode('UTF-16-LE')393 nativelanman = nativelanman.encode('UTF-16-LE')394 primarydomain = primarydomain.encode('UTF-16-LE')395 else:396 nativeos = nativeos.encode('ASCII')397 nativelanman = nativelanman.encode('ASCII')398 primarydomain = primarydomain.encode('ASCII')399 self['ByteCount'] = len(self['SecurityBlob']) + len(pad) + len(nativeos) + len(nativelanman) + len(primarydomain)400 data = Struct.pack(self)401 return data + self['SecurityBlob'] + pad + nativeos + nativelanman + primarydomain402class SMBTreeDisconnectRequestOld(Struct):403 st = [404 ['WordCount' , '<B', 0],405 ['ByteCount' , '<H', 0],406 ]407 def __init__(self, data=None, is_unicode=False):408 Struct.__init__(self, data)409class SMBTreeDisconnectResponseOld(Struct):410 st = [411 ['WordCount' , '<B', 0],412 ['ByteCount' , '<H', 0],413 ]414 def __init__(self, data=None, is_unicode=False):415 Struct.__init__(self, data)416class SMBLogoffAndXRequestOld(Struct):417 st = [418 ['WordCount' , '<B', 2],419 ['AndXCommand' , '<B', 0xff],420 ['AndXReserved' , '<B', 0],421 ['AndXOffset' , '<H', 0],422 ['ByteCount' , '<H', 0],423 ]424 def __init__(self, data=None, is_unicode=False):425 Struct.__init__(self, data)426class SMBLogoffAndXResponseOld(Struct):427 st = [428 ['WordCount' , '<B', 2],429 ['AndXCommand' , '<B', 0xff],430 ['AndXReserved' , '<B', 0],431 ['AndXOffset' , '<H', 0],432 ['ByteCount' , '<H', 0],433 ]434 def __init__(self, data=None, is_unicode=False):435 Struct.__init__(self, data)436class SMBNTTransactRequestOld(Struct):437 st = [438 ['WordCount' , '<B', 19], #19+SetupCount439 ['MaxSetupCount' , '<B', 0],440 ['Reserved1' , '<H', 0],441 ['TotalParameterCount' , '<L', 0],442 ['TotalDataCount' , '<L', 0],443 ['MaxParameterCount' , '<L', 0],444 ['MaxDataCount' , '<L', 0],445 ['ParameterCount' , '<L', 0],446 ['ParameterOffset' , '<L', 0],447 ['DataCount' , '<L', 0],448 ['DataOffset' , '<L', 0],449 ['SetupCount' , '<B', 0],450 ['Function' , '<H', 0],451 ['Setup' , '0s', ''],452 ['ByteCount' , '0s', ''],453 ['Pad1' , '0s', ''],454 ['NT_Trans_Parameters' , '0s', ''],455 ['Pad2' , '0s', ''],456 ['NT_Trans_Data' , '0s', ''],457 ]458 def __init__(self, data=None, is_unicode=False):459 Struct.__init__(self, data)460 self.is_unicode = is_unicode461 if data is not None:462 pos = self.calcsize()463 size = self['SetupCount'] * calcsize('<H')464 self['Setup'] = data[pos:pos + size]465 pos += size466 size = calcsize('<H')467 self['ByteCount'] = unpack('<H', data[pos:pos + size])[0]468 pos += size469 self['Pad1'] = data[pos:self['ParameterOffset'] - SMB_HEADER_SIZE]470 pos = self['ParameterOffset'] - SMB_HEADER_SIZE471 size = self['ParameterCount']472 self['NT_Trans_Parameters'] = data[pos:pos + size]473 pos += size474 self['Pad2'] = data[pos:self['DataOffset'] - SMB_HEADER_SIZE]475 pos = self['DataOffset'] - SMB_HEADER_SIZE476 size = self['DataCount']477 self['NT_Trans_Data'] = data[pos:pos + size]478 def pack(self):479 self['SetupCount'] = len(self['Setup']) / calcsize('<H')480 self['WordCount'] = 19 + self['SetupCount']481 self['DataCount'] = len(self['NT_Trans_Data'])482 if self['TotalDataCount'] == 0:483 self['TotalDataCount'] = self['DataCount']484 self['ParameterCount'] = len(self['NT_Trans_Parameters'])485 if self['TotalParameterCount'] == 0:486 self['TotalParameterCount'] = self['ParameterCount']487 size = SMB_HEADER_SIZE + self.calcsize() + len(self['Setup']) + calcsize('<H')488 if self['Pad1'] == '':489 if (size % 4) != 0:490 self['Pad1'] = '\0' * (4 - (size % 4))491 size += len(self['Pad1'])492 self['ParameterOffset'] = size493 size += len(self['NT_Trans_Parameters'])494 if self['Pad2'] == '':495 if (size % 4) != 0:496 self['Pad2'] = '\0' * (4 - (size % 4))497 size += len(self['Pad2'])498 self['DataOffset'] = size499 data = Struct.pack(self) + self['Setup'] + pack('<H', len(self['Pad1']) + len(self['NT_Trans_Parameters']) + len(self['Pad2']) + len(self['NT_Trans_Data'])) + self['Pad1'] + self['NT_Trans_Parameters'] + self['Pad2'] + self['NT_Trans_Data']500 return data501class SMBNTTransactResponseOld(Struct):502 st = [503 ['WordCount' , '<B', 18], #18+SetupCount504 ['Reserved1' , '3s', '\0' * 3],505 ['TotalParameterCount' , '<L', 0],506 ['TotalDataCount' , '<L', 0],507 ['ParameterCount' , '<L', 0],508 ['ParameterOffset' , '<L', 0],509 ['ParameterDisplacement' , '<L', 0],510 ['DataCount' , '<L', 0],511 ['DataOffset' , '<L', 0],512 ['DataDisplacement' , '<L', 0],513 ['SetupCount' , '<B', 0],514 ['Setup' , '0s', ''],515 ['ByteCount' , '0s', ''],516 ['Pad1' , '0s', ''],517 ['NT_Trans_Parameters' , '0s', ''],518 ['Pad2' , '0s', ''],519 ['NT_Trans_Data' , '0s', ''],520 ]521 def __init__(self, data=None, is_unicode=False):522 if data is not None and len(data) < self.calcsize(): #Interim server response523 self['WordCount'] = 0524 return525 Struct.__init__(self, data)526 self.is_unicode = is_unicode527 if data is not None:528 pos = self.calcsize()529 size = self['SetupCount'] * calcsize('<H')530 self['Setup'] = data[pos:pos + size]531 pos += size532 size = calcsize('<H')533 self['ByteCount'] = unpack('<H', data[pos:pos + size])[0]534 pos += size535 self['Pad1'] = data[pos:self['ParameterOffset'] - SMB_HEADER_SIZE]536 pos = self['ParameterOffset'] - SMB_HEADER_SIZE537 size = self['ParameterCount']538 self['NT_Trans_Parameters'] = data[pos:pos + size]539 pos += size540 self['Pad2'] = data[pos:self['DataOffset'] - SMB_HEADER_SIZE]541 pos = self['DataOffset'] - SMB_HEADER_SIZE542 size = self['DataCount']543 self['NT_Trans_Data'] = data[pos:pos + size]544 def pack(self):545 self['SetupCount'] = len(self['Setup']) / calcsize('<H')546 self['WordCount'] = 18 + self['SetupCount']547 self['DataCount'] = len(self['NT_Trans_Data'])548 if self['TotalDataCount'] == 0:549 self['TotalDataCount'] = self['DataCount']550 self['ParameterCount'] = len(self['NT_Trans_Parameters'])551 if self['TotalParameterCount'] == 0:552 self['TotalParameterCount'] = self['ParameterCount']553 size = SMB_HEADER_SIZE + self.calcsize() + len(self['Setup']) + calcsize('<H')554 if self['Pad1'] == '':555 if (size % 4) != 0:556 self['Pad1'] = '\0' * (4 - (size % 4))557 size += len(self['Pad1'])558 self['ParameterOffset'] = size559 size += len(self['NT_Trans_Parameters'])560 if self['Pad2'] == '':561 if (size % 4) != 0:562 self['Pad2'] = '\0' * (4 - (size % 4))563 size += len(self['Pad2'])564 self['DataOffset'] = size565 data = Struct.pack(self) + self['Setup'] + pack('<H', len(self['Pad1']) + len(self['NT_Trans_Parameters']) + len(self['Pad2']) + len(self['NT_Trans_Data'])) + self['Pad1'] + self['NT_Trans_Parameters'] + self['Pad2'] + self['NT_Trans_Data']566 return data567class SMBTransactionSecondaryRequestOld(Struct):568 st = [569 ['WordCount' , '<B', 8],570 ['TotalParameterCount' , '<H', 0],571 ['TotalDataCount' , '<H', 0],572 ['ParameterCount' , '<H', 0],573 ['ParameterOffset' , '<H', 0],574 ['ParameterDisplacement' , '<H', 0],575 ['DataCount' , '<H', 0],576 ['DataOffset' , '<H', 0],577 ['DataDisplacement' , '<H', 0],578 ['ByteCount' , '<H', 0],579 ['Pad1' , '0s', ''],580 ['Trans_Parameters' , '0s', ''],581 ['Pad2' , '0s', ''],582 ['Trans_Data' , '0s', ''],583 ]584 def __init__(self, data=None, is_unicode=False):585 Struct.__init__(self, data)586 self.is_unicode = is_unicode587 if data is not None:588 pos = self.calcsize()589 self['Pad1'] = data[pos:self['ParameterOffset'] - SMB_HEADER_SIZE]590 pos = self['ParameterOffset'] - SMB_HEADER_SIZE591 size = self['ParameterCount']592 self['Trans_Parameters'] = data[pos:pos + size]593 pos += size594 self['Pad2'] = data[pos:self['DataOffset'] - SMB_HEADER_SIZE]595 pos = self['DataOffset'] - SMB_HEADER_SIZE596 size = self['DataCount']597 self['Trans_Data'] = data[pos:pos + size]598 def pack(self):599 self['DataCount'] = len(self['Trans_Data'])600 if self['TotalDataCount'] == 0:601 self['TotalDataCount'] = self['DataCount']602 self['ParameterCount'] = len(self['Trans_Parameters'])603 if self['TotalParameterCount'] == 0:604 self['TotalParameterCount'] = self['ParameterCount']605 size = SMB_HEADER_SIZE + self.calcsize()606 if self['Pad1'] == '':607 if (size % 4) != 0:608 self['Pad1'] = '\0' * (4 - (size % 4))609 size += len(self['Pad1'])610 self['ParameterOffset'] = size611 size += len(self['Trans_Parameters'])612 if self['Pad2'] == '':613 if (size % 4) != 0:614 self['Pad2'] = '\0' * (4 - (size % 4))615 size += len(self['Pad2'])616 self['DataOffset'] = size617 self['ByteCount'] = len(self['Pad1']) + len(self['Trans_Parameters']) + len(self['Pad2']) + len(self['Trans_Data'])618 data = Struct.pack(self)619 data += self['Pad1'] + self['Trans_Parameters'] + self['Pad2'] + self['Trans_Data']620 return data621class SMBNTTransactSecondaryRequestOld(Struct):622 st = [623 ['WordCount' , '<B', 18],624 ['Reserved1' , '3s', '\0' * 3],625 ['TotalParameterCount' , '<L', 0],626 ['TotalDataCount' , '<L', 0],627 ['ParameterCount' , '<L', 0],628 ['ParameterOffset' , '<L', 0],629 ['ParameterDisplacement' , '<L', 0],630 ['DataCount' , '<L', 0],631 ['DataOffset' , '<L', 0],632 ['DataDisplacement' , '<L', 0],633 ['Reserved2' , '<B', 0],634 ['ByteCount' , '<H', 0],635 ['Pad1' , '0s', ''],636 ['NT_Trans_Parameters' , '0s', ''],637 ['Pad2' , '0s', ''],638 ['NT_Trans_Data' , '0s', ''],639 ]640 def __init__(self, data=None, is_unicode=False):641 Struct.__init__(self, data)642 self.is_unicode = is_unicode643 if data is not None:644 pos = self.calcsize()645 self['Pad1'] = data[pos:self['ParameterOffset'] - SMB_HEADER_SIZE]646 pos = self['ParameterOffset'] - SMB_HEADER_SIZE647 size = self['ParameterCount']648 self['NT_Trans_Parameters'] = data[pos:pos + size]649 pos += size650 self['Pad2'] = data[pos:self['DataOffset'] - SMB_HEADER_SIZE]651 pos = self['DataOffset'] - SMB_HEADER_SIZE652 size = self['DataCount']653 self['NT_Trans_Data'] = data[pos:pos + size]654 def pack(self):655 self['DataCount'] = len(self['NT_Trans_Data'])656 if self['TotalDataCount'] == 0:657 self['TotalDataCount'] = self['DataCount']658 self['ParameterCount'] = len(self['NT_Trans_Parameters'])659 if self['TotalParameterCount'] == 0:660 self['TotalParameterCount'] = self['ParameterCount']661 size = SMB_HEADER_SIZE + self.calcsize()662 if self['Pad1'] == '':663 if (size % 4) != 0:664 self['Pad1'] = '\0' * (4 - (size % 4))665 size += len(self['Pad1'])666 self['ParameterOffset'] = size667 size += len(self['NT_Trans_Parameters'])668 if self['Pad2'] == '':669 if (size % 4) != 0:670 self['Pad2'] = '\0' * (4 - (size % 4))671 size += len(self['Pad2'])672 self['DataOffset'] = size673 self['ByteCount'] = len(self['Pad1']) + len(self['NT_Trans_Parameters']) + len(self['Pad2']) + len(self['NT_Trans_Data'])674 data = Struct.pack(self)675 data += self['Pad1'] + self['NT_Trans_Parameters'] + self['Pad2'] + self['NT_Trans_Data']676 return data677class SMBEchoRequestOld(Struct):678 st = [679 ['WordCount' , '<B', 0x01],680 ['EchoCount' , '<H', 0x01],681 ['ByteCount' , '<H', 0],682 ['EchoData' , '0s', ''],683 ]684 def __init__(self, data=None, is_unicode=False):685 Struct.__init__(self, data)686 if data is not None:687 pos = self.calcsize()688 self['EchoData'] = data[pos:]689 def pack(self):690 self['ByteCount'] = len(self['EchoData'])691 return Struct.pack(self) + self['EchoData']692class SMBEchoResponseOld(Struct):693 st = [694 ['WordCount' , '<B', 0x01],695 ['SequenceNumber' , '<H', 0],696 ['ByteCount' , '<H', 0],697 ['EchoData' , '0s', ''],698 ]699 def __init__(self, data=None, is_unicode=False):700 Struct.__init__(self, data)701 if data is not None:702 pos = self.calcsize()703 self['EchoData'] = data[pos:]704 def pack(self):705 self['ByteCount'] = len(self['EchoData'])...

Full Screen

Full Screen

utils_preprocess.py

Source:utils_preprocess.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2#pylint: skip-file3import sys4import os5import re6import string7from nltk import tokenize8def extract_chinese_clause(doc_str, num_split):9 sentences = []10 if doc_str != u"":11 tmp = re.split(ur"[。!?]+", doc_str)12 for e in tmp:13 if e != u"":14 sentences.append(e)15 len_doc = 016 clauses = []17 for doc in sentences:18 if doc != u"":19 tmp = re.split(ur"[,;]+", doc)20 for e in tmp:21 len_doc += len(e)22 clauses += tmp23 24 len_clause = len_doc / num_split25 result = []26 s = []27 len_s = 028 for cl in clauses[:-1]:29 if len_s > len_clause:30 result.append(u"".join(s))31 s = []32 len_s = 033 s.append(cl)34 len_s += len(cl)35 if len_s != 0:36 result.append(u"".join(s))37 if len(result) < num_split:38 result.append(clauses[-1])39 else:40 result[-1] += clauses[-1]41 return result42def read_news(src_path, has_y, is_unicode):43 abstract = ""44 contents = []45 with open(src_path, "r") as f_src:46 if has_y:47 if is_unicode:48 abstract = f_src.readline().strip().decode("utf-8")49 else:50 abstract = f_src.readline().strip()51 while 1:52 line = f_src.readline()53 if line == "\n":54 break55 if is_unicode:56 line = line.strip().decode("utf-8")57 else:58 line = line.strip()59 contents.append(line)60 return (abstract, contents)61def read_info(src_path, has_abstract, is_unicode):62 abstract = None63 with open(src_path, "r") as f_src:64 if has_abstract:65 line = f_src.readline()66 if not line:67 return None68 abstract = line.decode("utf-8").split() if is_unicode else line.split()69 contents = []70 '''71 if has_feature:72 sim = []73 cw = []74 en = []75 pos = []76 '''77 while 1:78 line = f_src.readline()79 if not line:80 break81 line = line.strip('\n')82 if line == "":83 continue84 if is_unicode:85 line = line.decode("utf-8")86 87 contents.append(line.split())88 '''89 if has_feature:90 sim.append(float(f_src.readline().strip()))91 cw.append(float(f_src.readline().strip()))92 en.append(float(f_src.readline().strip()))93 pos.append(int(f_src.readline().strip()))94 '''95 #feature = (sim, cw, en, pos) if has_feature else None96 return abstract, contents#, feature97class washer(object):98 def __init__(self, stopwords_path, is_unicode, x_num_words, y_num_words, x_num_sents):99 self.stopwords = set()100 self.is_unicode = is_unicode101 if stopwords_path != None:102 with open(stopwords_path, "r") as f_stop:103 for line in f_stop:104 line = line.strip()105 if is_unicode:106 line = line.decode("utf-8")107 self.stopwords.add(line)108 109 self.white_str = u"" if is_unicode else ""110 self.space = ur"[\s]+"111 if is_unicode:112 self.chinese_punc = ur"[【】、·:『』「」“”《》……¥#()‘’]+"113 self.punc_table = dict((ord(char), None) for char in string.punctuation)114 self.digit_table = dict((ord(char), None) for char in string.digits)115 else:116 self.punc_table = string.maketrans("", "")117 self.X_MIN_NUM_WORDS, self.X_MAX_NUM_WORDS = x_num_words118 self.Y_MIN_NUM_WORDS, self.Y_MAX_NUM_WORDS = y_num_words119 self.MIN_NUM_SENTS, self.MAX_NUM_SENTS = x_num_sents120 def wash_word(self, word, delete_stopwords):121 word = re.sub(self.space, self.white_str, word) # delete white space122 if self.is_unicode:123 word = word.translate(self.punc_table)124 word = word.translate(self.digit_table) # delete numbers125 word = re.sub(self.chinese_punc, self.white_str, word)126 else:127 word = word.translate(self.punc_table, string.punctuation) # delete punctuations128 word = word.translate(None, string.digits) # delete numbers129 if delete_stopwords:130 if word in self.stopwords:131 return None132 if word == self.white_str:133 return None134 135 return word136 # sent should be a list137 def wash_abstract(self, sent):138 if len(sent) < self.Y_MIN_NUM_WORDS or len(sent) > self.Y_MAX_NUM_WORDS:139 return None140 141 return sent142 # sent should be a list143 def wash_sent(self, sent):144 if len(sent) < self.X_MIN_NUM_WORDS or len(sent) > self.X_MAX_NUM_WORDS:145 return None146 147 return sent148 def wash_news(self, abstract, contents, delete_stopwords):149 wash_status = 0150 if abstract != None:151 new_abstract = []152 tmp = abstract.lower().split()153 for e in tmp:154 e = self.wash_word(e, delete_stopwords)155 if e != None:156 new_abstract.append(e)157 new_abstract = self.wash_abstract(new_abstract)158 if new_abstract == None:159 wash_status |= 1160 else:161 new_abstract = None162 163 new_contents = []164 for s in contents:165 new_s = []166 tmp = s.lower().split()167 for e in tmp:168 e = self.wash_word(e, delete_stopwords)169 if e != None:170 new_s.append(e)171 new_s = self.wash_sent(new_s)172 new_contents.append(new_s)173 174 num_contents = 0175 for e in new_contents:176 if e != None:177 num_contents += 1178 if num_contents > self.MAX_NUM_SENTS:179 break180 if num_contents < self.MIN_NUM_SENTS:181 new_contents = None182 wash_status |= 2183 if num_contents > self.MAX_NUM_SENTS: # save first MAX_NUM_SENTS sentences184 stop = 0185 i = 0186 while stop < self.MAX_NUM_SENTS:187 if new_contents[i] != None:188 stop += 1189 i += 1190 new_contents = new_contents[0 : stop]191 192 return (new_abstract, new_contents, wash_status)193def interpolate_space(words, is_unicode):194 space = u" " if is_unicode else " "195 return space.join(words)196def remove_space(string, is_unicode):197 empty = u"" if is_unicode else " "198 return empty.join(string.split())199# split a paragraph into sentences (only for English)200def split_paragraph(sent):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run assertpy automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful