Best Python code snippet using lisa_python
features2strmatrix.py
Source:features2strmatrix.py  
1# import itertools as it2import os.path3import re4# import nltk.corpus as corpus5import numpy as np6import pandas as pd7import sklearn.linear_model as ln8# from nltk import PorterStemmer9from pandas import DataFrame10from sklearn import cross_validation11from src.algos.utils import RMSE_NORMALIZED12from src.features.sets import blank13from src.features.sets import boolean_columns14# from src.features.sets import get_syn15# from src.features.sets import stop_words16from src.features.extract_count_features import process_str17from src.features.extract_count_features import load_features118def to_set(x):19    if type(x) is set: return x20    return blank21def to_str(x):22    if type(x) is str: return x23    if type(x) is unicode: return x24    return u''25def column_transformer(x, combine=True):26    if x in boolean_columns: return 'combined_boolean'27    c = str(x).lower()28    c = c.decode('utf-8', 'ignore')29    if combine:30        if re.search(31                '(:?width|height|depth|length|size|thickness|capacity|diameter|\(in\.\)|\(ft\.\)|\(mm\))',32                c) is not None:33            return 'combined_size'34        if re.search('watt', c) is not None:35            return 'combined_watt'36        if re.search('volt', c) is not None:37            return 'combined_volt'38        if re.search('(:?weight|\(lb\.\))', c) is not None:39            return 'combined_weight'40        if re.search('brand', c) is not None:41            return 'combined_brand'42        if re.search('(:?color|rgb value)', c) is not None:43            return 'combined_color'44        if re.search('material', c) is not None:45            return 'combined_material'46        if re.search('temperature', c) is not None:47            return 'combined_temperature'48        if re.search('type', c) is not None:49            return 'combined_type'50        if re.search('(:?time|\(hours\)|\(min\.\))', c) is not None:51            return 'combined_time'52        if re.search('(:?number of|count|# of)', c) is not None:53            return 'combined_count'54        if re.search('(?:degree|angle|\(deg\))', c) is not None:55            return 'combined_angle'56        if re.search('(:?\(sq\.? ft\.?\)|square foot|\(sq\. in\.\))', c) is not None:57            return 'combined_area'58        if re.search('(?:\(\w?hz\)|frequency|\(rpm\)|\(gpm\)|\(cfm\)|\(mph\)|speed|/hour|/min|per min)', c) is not None:59            return 'combined_speed'60        if re.search('(?:\(db\)|noise)', c) is not None:61            return 'combined_noise'62        if re.search('\([^\(\)]+\)$', c) is not None:63            return 'combined_measurements'64        if re.search('/', c) is not None:65            return 'combined_type'66    c = re.sub('bullet\d+', 'bullet', c)67    return c68def value_transformer(c, v, skip_stop_words=True, enable_stemming=True):69    if c in boolean_columns:70        v = str(v).lower()71        if v.startswith('y'):72            v = c73        else:74            return ''75    av = process_str(s=v, stemming=enable_stemming, skip_stop_words=skip_stop_words)76    return av77def product2attrs(product_to_trace=None, combine=True, skip_stop_words=True, enable_stemming=True):78    if product_to_trace is None: product_to_trace = {}79    file_name = './dataset/product_to_attributes.'80    if combine: file_name += 'combined.'81    if skip_stop_words: file_name += 'stop.'82    if enable_stemming: file_name += 'stemming.'83    file_name += 'csv'84    if os.path.isfile(file_name):85        print 'load data from file'86        rs = pd.read_csv(file_name, encoding='utf-8', index_col=0)87        print 'loaded', file_name, '->', rs.shape88        return rs89    attrs = pd.read_csv('./dataset/attributes.csv')90    attrs = attrs[attrs['product_uid'] == attrs['product_uid']]91    descrs = pd.read_csv('./dataset/product_descriptions.csv')92    descrs = descrs[descrs['product_uid'] == descrs['product_uid']]93    print 'attributes:', attrs.shape94    colls = attrs['name'].apply(lambda c: column_transformer(c, combine)).unique()95    print 'attributes columns:', len(colls)96    # noinspection PyUnresolvedReferences97    product_ids = [int(x) for x in pd.concat([attrs['product_uid'], descrs['product_uid']]).unique()]98    print 'unique ids:', len(product_ids)99    rs = DataFrame(index=product_ids, columns=np.hstack(colls))100    rs.index.names = ['product_uid']101    print 'process attrs'102    for index, row in attrs.iterrows():103        if index % 100000 == 0: print index,104        id = int(row['product_uid'])105        cc = column_transformer(row['name'], combine)106        is_trace_enabled = id in product_to_trace107        if is_trace_enabled:108            print109            print row['name'], id, '->', row['value']110        cv = value_transformer(row['name'], row['value'], skip_stop_words, enable_stemming)111        current = rs.at[id, cc]112        rs.at[id, cc] = (to_str(current) + ' ' + cv).strip()113        if is_trace_enabled: print cc, id, '->', rs.at[id, cc]114    print115    print 'descriptions :', descrs.shape116    for index, row in descrs.iterrows():117        if index % 10000 == 0: print index,118        id = int(row['product_uid'])119        if id not in rs.index: continue120        is_trace_enabled = id in product_to_trace121        if is_trace_enabled:122            print123            print 'product_description', id, '->', row['product_description']124        current = rs.at[id, 'bullet']125        rs.at[id, 'bullet'] = (to_str(current) + ' ' +126                               value_transformer('bullet', row['product_description'],127                                                 skip_stop_words, enable_stemming)).strip()128        if is_trace_enabled: print 'bullet', id, '->', rs.at[id, 'bullet']129    print130    print 'store data into file'131    rs.to_csv(file_name, encoding='utf-8')132    print 'result:', rs.shape, '->', file_name133    return rs134def count_words(data, search):135    if type(data) is not unicode: return 0136    return len(set(data.split(' ')) & search)137def count_words_vectorized(s):138    if type(s[0]) is not unicode: return 0139    if type(s[1]) is not unicode: return 0140    return len(set(s[0].split(' ')) & set(s[1].split(' ')))141def search_in_title(s):142    if type(s[0]) is not unicode: return 0143    if type(s[1]) is not unicode: return 0144    return float(s[0] in s[1])145def search_in_bullet(s, p_to_a):146    if type(s[1]) is not unicode: return 0147    return float(s[1] in p_to_a.loc[int(s[0]), 'bullet'])148def safe_len(s):149    if type(s) is unicode: return len(s.split(' '))150    return 0151def prepare_word_set(data_file='train', product_to_trace=None, skip_stop_words=True, enable_stemming=True):152    if product_to_trace is None: product_to_trace = set([])153    file_name = './dataset/word_set.' + data_file + '.'154    if skip_stop_words:155        file_name += 'stop.'156    if enable_stemming:157        file_name += 'stemming.'158    file_name += 'csv'159    if os.path.isfile(file_name):160        print 'load', data_file, 'data from file'161        data = pd.read_csv(file_name, encoding='utf-8')162        print 'loaded', data.shape, '->', file_name163        return data164    data = pd.read_csv('./dataset/' + data_file + '.csv')165    columns = ['id', 'product_uid', 'product_title', 'search_set']166    if 'relevance' in data.columns: columns.append('relevance')167    x = DataFrame(columns=columns)168    x['id'] = data['id']169    x['product_uid'] = data['product_uid']170    if 'relevance' in data.columns:171        x['relevance'] = data['relevance']172    for index, row in data.iterrows():173        if index % 10000 == 0: print index,174        pid = int(row['product_uid'])175        oid = int(row['id'])176        is_trace_enabled = pid in product_to_trace177        if is_trace_enabled:178            print179            print 'search term', pid, '(', oid, ')', '[', row['search_term'], ']'180        x.at[index, 'search_set'] = value_transformer('search_set', row['search_term'],181                                                      skip_stop_words, enable_stemming)182        if is_trace_enabled: print 'search set', pid, '(', oid, ')', x.at[index, 'search_set']183        if is_trace_enabled: print 'product title', pid, '(', oid, ')', '[', row['product_title'], ']'184        x.at[index, 'product_title'] = value_transformer('product_title', row['product_title'],185                                                         skip_stop_words, enable_stemming)186        if is_trace_enabled: print 'product title', pid, '(', oid, ')', '[', x.at[index, 'product_title'], ']'187    print188    print 'store word set'189    x.to_csv(file_name, encoding='utf-8', index=None)190    print 'stored', x.shape, '->', file_name191    return x192def ration(s):193    if s[1] == 0: return 0194    return s[0] / s[1]195def add_ration(features):196    print 'process ration:', 'product_title',197    features['product_title/ratio/'] = features[['product_title', 'search_len']].apply(ration, axis=1,198                                                                                       reduce=True, raw=True)199    print 'search_in_title',200    features['search_in_title/ratio/'] = features[['product_title', 'search_len']].apply(ration, axis=1,201                                                                                         reduce=True, raw=True)202    print 'search_in_bullet',203    features['search_in_bullet/ratio/'] = features[['product_title', 'search_len']].apply(ration, axis=1,204                                                                                          reduce=True, raw=True)205    print 'bullet'206    features['bullet/ratio/'] = features[['product_title', 'search_len']].apply(ration, axis=1,207                                                                                reduce=True, raw=True)208    print 'ratio finish'209def add_indicators(features):210    indicators = ['combined_size', 'combined_material', 'combined_brand', 'combined_color', 'combined_weight',211                  'combined_watt', 'combined_volt', 'combined_type']212    print 'process indicators:',213    for ind in indicators:214        print ind,215        features[ind + '/indicator/'] = features[ind].apply(lambda x: float(x > 0))216def integrate_with_anton(features):217    print 'integrate with anton'218    anton_train = pd.read_csv('./dataset/good_ft_2_train.csv', index_col='id')219    anton_test = pd.read_csv('./dataset/good_ft_2_test.csv', index_col='id')220    if 'relevance' in features.columns:221        anton = anton_train222        features = features[features.columns[features.columns != 'relevance']]223    else:224        anton = anton_test225    features = pd.merge(anton, features, right_on='id', left_index=True)226    print 'finish integrating'227    return features228def match_features(p_to_a=None, data_file='train'):229    file_name = './dataset/raw_features.' + data_file + '.csv'230    if os.path.isfile(file_name):231        print 'load', data_file, 'data from file'232        features = pd.read_csv(file_name)233        print 'loaded', features.shape, '->', file_name234        return features235    if p_to_a is None: p_to_a = product2attrs()236    data = prepare_word_set(data_file)237    print 'build start data frame'238    attrs = p_to_a.columns239    features = DataFrame(columns=attrs)240    features['id'] = data['id']241    if 'relevance' in data.columns: features['relevance'] = data['relevance']242    print 'calculate search_len'243    features['search_len'] = data['search_set'].apply(safe_len)244    print 'calculate product_title'245    features['product_title'] = data[['product_title', 'search_set']].apply(count_words_vectorized, axis=1,246                                                                            reduce=True, raw=True)247    print 'calculate search_in_title'248    features['search_in_title'] = data[['search_set', 'product_title']].apply(search_in_title, axis=1,249                                                                              reduce=True, raw=True)250    print 'calculate search_in_bullet'251    features['search_in_bullet'] = data[['product_uid', 'search_set']].apply(lambda s: search_in_bullet(s, p_to_a)252                                                                             , axis=1, reduce=True, raw=True)253    print 'process attributes'254    features = features.fillna(0.0)255    tmp = np.zeros((data.shape[0], len(attrs)))256    for index, row in data.iterrows():257        if index % 10000 == 0: print index,258        pid = row['product_uid']259        search_set = row['search_set']260        if type(search_set) is not unicode: continue261        search_set = set(search_set.split(' '))262        values = p_to_a.loc[pid]263        tmp[index] = values.apply(lambda d: count_words(d, search_set))264    print265    print 'integrate features with attributes'266    features[attrs] = tmp267    add_ration(features)268    add_indicators(features)269    features = integrate_with_anton(features)270    print 'store features'271    features.to_csv(file_name, index=None)272    print 'stored', features.shape, '->', file_name273    return features274def features_to_x(features):275    columns = features.columns276    columns = columns[np.all([columns[:] != 'relevance',277                              columns[:] != 'id'], axis=0)]278    x = features[columns]279    return x280def zero_normalization(features, merge=True):281    file_name = './dataset/zero_normalization.csv'282    if os.path.isfile(file_name):283        print 'load', file_name, 'data from file'284        indexes = pd.Series.from_csv(file_name)285        if 'relevance' not in features.columns:286            indexes = indexes[indexes.index != 'relevance']287        print 'loaded', indexes.shape, '->', file_name288    else:289        if 'relevance' not in features.columns: raise Exception('process train features before test')290        indexes = features.apply(np.sum, axis=0) > 0291        print 'store indexes'292        indexes.to_csv(file_name)293        print 'stored', indexes.shape, '->', file_name294    if merge:295        features = features.copy(deep=True)296        features['bullet'] += features[features.columns[indexes == False]].apply(np.sum, axis=1)297    features = features[features.columns[indexes]]298    print 'zero normalized', features.shape299    return features300def fill_start_mask(mask, start_mask):301    if start_mask is None: return mask302    if type(start_mask) is str:303        file_name = './dataset/mask.' + start_mask + '.csv'304        if not os.path.isfile(file_name): raise Exception('can not find start mask')305        print 'load', file_name, 'data from file'306        start_mask = pd.Series.from_csv(file_name)307        print 'loaded', start_mask.shape, '->', file_name308        for col in mask.index:309            if col in start_mask.index:310                mask[col] = start_mask[col]311        print 'start mask applied'312        return mask313    for ix in mask.index:314        if ix in start_mask: mask[ix] = 'F'315    return mask316def select_features(mask_name, features, cls=ln.LinearRegression(normalize=True), allow_merge=False, start_mask=None):317    features = features.copy(deep=True)318    file_name = './dataset/mask.' + mask_name + '.csv'319    changes = 0320    if os.path.isfile(file_name):321        print 'load', file_name, 'data from file'322        mask = pd.Series.from_csv(file_name)323        if 'relevance' not in features.columns:324            mask = mask[mask.index != 'relevance']325        print 'loaded', mask.shape, '->', file_name326        col_to_merge = features.columns[mask == 'M']327        if len(col_to_merge) > 0:328            features['bullet'] += features[col_to_merge].apply(np.sum, axis=1)329        result_features = features[features.columns[mask == 'F']]330        return result_features331    print 'source', features.shape332    mask = features.loc[features.index[0]].apply(lambda x: 'D')333    if 'relevance' in mask.index: mask['relevance'] = 'F'334    if 'id' in mask.index: mask['id'] = 'F'335    mask = fill_start_mask(mask, start_mask)336    y = features['relevance']337    if len(features_to_x(features[features.columns[mask == 'F']]).columns) > 0:338        score = cross_validation.cross_val_score(cls, features_to_x(features[features.columns[mask == 'F']])339                                                 , y, scoring=RMSE_NORMALIZED, cv=5).mean()340    else:341        score = -100000342    print 'start score', score343    for i, feature in enumerate(mask.index[mask == 'D']):344        print 'add', feature,345        mask[feature] = 'F'346        filtered = features[features.columns[mask == 'F']]347        print filtered.shape348        s = cross_validation.cross_val_score(cls, features_to_x(filtered), y, scoring=RMSE_NORMALIZED, cv=5).mean()349        print 'calculated score', s,350        if s > score:351            score = s352            changes += 1353            print 'accept feature', feature354        else:355            mask[feature] = 'D'356            print 'reject feature', feature357    print 'remove features', score358    for feature in mask.index[mask == 'F']:359        if feature in {'relevance', 'id'}: continue360        print 'remove', feature,361        mask[feature] = 'D'362        filtered = features[features.columns[mask == 'F']]363        print filtered.shape364        s = cross_validation.cross_val_score(cls, features_to_x(filtered), y, scoring=RMSE_NORMALIZED, cv=5).mean()365        print 'calculated score', s,366        if s > score:367            score = s368            changes += 1369            print 'reject feature', feature370        else:371            mask[feature] = 'F'372            print 'rollback feature', feature373    if allow_merge:374        print 'merge features', score375        unmerged = {'relevance', 'id', 'bullet', 'search_len', 'synonyms'}376        for i, feature in enumerate(mask.index):377            if feature in unmerged: continue378            print 'merge', feature,379            backup = mask[feature]380            mask[feature] = 'M'381            features['bullet'] += features[feature]382            filtered = features[features.columns[mask == 'F']]383            print filtered.shape384            s = cross_validation.cross_val_score(cls, features_to_x(filtered), y, scoring=RMSE_NORMALIZED, cv=5).mean()385            print 'calculated score', s,386            if s > score:387                score = s388                print 'merge feature', feature389            elif (s == score) and (backup == 'D'):390                print 'merge feature', feature391            else:392                mask[feature] = backup393                features['bullet'] -= features[feature]394                print 'rollback feature', feature395    result_features = features[features.columns[mask == 'F']]396    print 'store', mask.shape, '->', file_name397    mask.to_csv(file_name)398    print 'result score', score, result_features.shape, 'changes', changes399    return result_features400def apply_search_len(s):401    if s['search_len'] == 0: return 0402    return float(s[0]) / s['search_len']403def normalize_search_len(features):404    features = features.copy()405    print 'normalization by search_len', features.shape, '->',406    for i, col in enumerate(features.columns):407        if col in {'id', 'relevance', 'search_len'}: continue408        features[col + '/slennorm/'] = features[[col, 'search_len']].apply(apply_search_len, axis=1)409    print features.shape...test_logger.py
Source:test_logger.py  
...89        self.assertTrue(self.test_logger.is_error_enabled())90        self.assertTrue(self.test_logger.is_warning_enabled())91        self.assertTrue(self.test_logger.is_info_enabled())92        self.assertTrue(self.test_logger.is_debug_enabled())93        self.assertTrue(self.test_logger.is_trace_enabled())9495        self.test_logger.set_level( Logger.TRACE )96        self.assertTrue(self.test_logger.is_fatal_enabled())97        self.assertTrue(self.test_logger.is_error_enabled())98        self.assertTrue(self.test_logger.is_warning_enabled())99        self.assertTrue(self.test_logger.is_info_enabled())100        self.assertTrue(self.test_logger.is_debug_enabled())101        self.assertTrue(self.test_logger.is_trace_enabled())102        103        self.test_logger.set_level( Logger.DEBUG )104        self.assertTrue(self.test_logger.is_fatal_enabled())105        self.assertTrue(self.test_logger.is_error_enabled())106        self.assertTrue(self.test_logger.is_warning_enabled())107        self.assertTrue(self.test_logger.is_info_enabled())108        self.assertTrue(self.test_logger.is_debug_enabled())109        self.assertFalse(self.test_logger.is_trace_enabled())110        111        self.test_logger.set_level( Logger.INFO )112        self.assertTrue(self.test_logger.is_fatal_enabled())113        self.assertTrue(self.test_logger.is_error_enabled())114        self.assertTrue(self.test_logger.is_warning_enabled())115        self.assertTrue(self.test_logger.is_info_enabled())116        self.assertFalse(self.test_logger.is_debug_enabled())117        self.assertFalse(self.test_logger.is_trace_enabled())118        119        self.test_logger.set_level( Logger.WARNING )120        self.assertTrue(self.test_logger.is_fatal_enabled())121        self.assertTrue(self.test_logger.is_error_enabled())122        self.assertTrue(self.test_logger.is_warning_enabled())123        self.assertFalse(self.test_logger.is_info_enabled())124        self.assertFalse(self.test_logger.is_debug_enabled())125        self.assertFalse(self.test_logger.is_trace_enabled())126        127        self.test_logger.set_level( Logger.ERROR )128        self.assertTrue(self.test_logger.is_fatal_enabled())129        self.assertTrue(self.test_logger.is_error_enabled())130        self.assertFalse(self.test_logger.is_warning_enabled())131        self.assertFalse(self.test_logger.is_info_enabled())132        self.assertFalse(self.test_logger.is_debug_enabled())133        self.assertFalse(self.test_logger.is_trace_enabled())134        135        self.test_logger.set_level( Logger.FATAL )136        self.assertTrue(self.test_logger.is_fatal_enabled())137        self.assertFalse(self.test_logger.is_error_enabled())138        self.assertFalse(self.test_logger.is_warning_enabled())139        self.assertFalse(self.test_logger.is_info_enabled())140        self.assertFalse(self.test_logger.is_debug_enabled())141        self.assertFalse(self.test_logger.is_trace_enabled())142        143        self.test_logger.set_level( Logger.OFF )144        self.assertFalse(self.test_logger.is_fatal_enabled())145        self.assertFalse(self.test_logger.is_error_enabled())146        self.assertFalse(self.test_logger.is_warning_enabled())147        self.assertFalse(self.test_logger.is_info_enabled())148        self.assertFalse(self.test_logger.is_debug_enabled())149        self.assertFalse(self.test_logger.is_trace_enabled())150        151        # Now test to see if a change to the logging level in one log affects other logs152        new_logger = ESAPI.logger("test_num2" )153        self.test_logger.set_level( Logger.OFF )154        new_logger.set_level( Logger.INFO )155        self.assertFalse(self.test_logger.is_fatal_enabled())156        self.assertFalse(self.test_logger.is_error_enabled())157        self.assertFalse(self.test_logger.is_warning_enabled())158        self.assertFalse(self.test_logger.is_info_enabled())159        self.assertFalse(self.test_logger.is_debug_enabled())160        self.assertFalse(self.test_logger.is_trace_enabled())161        162        self.assertTrue(new_logger.is_fatal_enabled())163        self.assertTrue(new_logger.is_error_enabled())164        self.assertTrue(new_logger.is_warning_enabled())165        self.assertTrue(new_logger.is_info_enabled())166        self.assertFalse(new_logger.is_debug_enabled())167        self.assertFalse(new_logger.is_trace_enabled())168        169    def test_info(self):170        """171        Test of info method, of class esapi.Logger.172        """173        self.test_logger.info(Logger.SECURITY_SUCCESS, "test message")174        self.test_logger.info(Logger.SECURITY_SUCCESS, "test message", None)175        self.test_logger.info(Logger.SECURITY_SUCCESS, "%3escript%3f test message", None)176        self.test_logger.info(Logger.SECURITY_SUCCESS, "<script> test message", None)177        178    def test_trace(self):179        """180        Test of trace method, of class esapi.Logger.181        """
...websocket_transport.py
Source:websocket_transport.py  
1import json2import logging3from asyncio import ensure_future4from typing import Any, List5from lime_python import SessionCompression, SessionEncryption, Transport6from websockets.client import WebSocketClientProtocol, connect7from websockets.exceptions import ConnectionClosed8class WebSocketTransport(Transport):9    """WebSocket transport implementation."""10    def __init__(self, is_trace_enabled: bool = False) -> None:11        super().__init__(SessionCompression.NONE, SessionEncryption.NONE)12        self.is_trace_enabled = is_trace_enabled13        logger = print14        if logging.root.level <= logging.DEBUG:15            logger = logging.getLogger()16        self.logger = logger17        self.websocket: WebSocketClientProtocol = None18    async def open_async(self, uri: str = None) -> None:  # noqa: D10219        if self.websocket and self.websocket.open:20            err = ValueError('Cannot open an already open connection')21            self.on_error(err)22            raise err23        if uri.startswith('wss://'):24            self.encryption = SessionEncryption.TLS25        else:26            self.encryption = SessionEncryption.NONE27        self.compression = SessionCompression.NONE28        self.websocket = await connect(uri, subprotocols=['lime'])29        self.on_open()30        ensure_future(self.__message_handler_async())31    async def close_async(self) -> None:  # noqa: D10232        await self.websocket.close()33        self.on_close()34    def send(self, envelope: dict) -> None:  # noqa: D10235        self.__ensure_is_open()36        envelope_str = json.dumps(envelope)37        if self.is_trace_enabled:38            self.logger(f'WebSocket SEND: {envelope_str}')39        ensure_future(self.websocket.send(envelope_str))40    def get_supported_compression(self) -> List[str]:  # noqa: D10241        return [SessionCompression.NONE]42    def set_compression(self, compression: str) -> None:  # noqa: D10243        pass44    def get_supported_encryption(self) -> List[str]:  # noqa: D10245        return [SessionEncryption.TLS, SessionEncryption.NONE]46    def set_encryption(self, encryption: str) -> None:  # noqa: D10247        pass48    def on_envelope(self, envelope: dict) -> None:  # noqa: D10249        pass50    def on_open(self) -> None:51        """Handle on websocket open callback."""52        pass53    def on_close(self) -> None:  # noqa: WPS12354        """Handle on websocket close callback."""55        pass56    def on_error(self, err: Any) -> None:57        """Handle on websocket error callback.58        Args:59            err (Any): the exception60        """61        pass62    def __ensure_is_open(self) -> None:63        if not self.websocket or not self.websocket.open:64            err = ValueError('The connection is not open')65            self.on_error(err)66            raise err67    async def __message_handler_async(self) -> None:68        try:69            while True:  # noqa: WPS45770                message = await self.websocket.recv()71                self.__on_envelope(message)72        except ConnectionClosed:73            if self.is_trace_enabled:74                self.logger(75                    'Stopped receiving messages due to closed connection'76                )77    def __on_envelope(self, envelope: str) -> None:78        if self.is_trace_enabled:79            self.logger(f'WebSocket RECEIVE: {envelope}')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
