Best Python code snippet using autotest_python
read_data.py
Source:read_data.py  
1import numpy as np2import pandas as pd3import torch4from torch.utils.data import Dataset5from pytorch_transformers import *6import torch.utils.data as Data7import pickle8class Translator:9    """Backtranslation. Here to save time, we pre-processing and save all the translated data into pickle files.10    """11    def __init__(self, path, transform_type='BackTranslation'):12        # Pre-processed German data13        with open(path + 'de_1.pkl', 'rb') as f:14            self.de = pickle.load(f)15        # Pre-processed Russian data16        with open(path + 'ru_1.pkl', 'rb') as f:17            self.ru = pickle.load(f)18    def __call__(self, ori, idx):19        out1 = self.de[idx]20        out2 = self.ru[idx]21        return out1, out2, ori22def get_data(data_path, n_labeled_per_class, unlabeled_per_class=5000, max_seq_len=256, model='bert-base-uncased', train_aug=False):23    """Read data, split the dataset, and build dataset for dataloaders.24    Arguments:25        data_path {str} -- Path to your dataset folder: contain a train.csv and test.csv26        n_labeled_per_class {int} -- Number of labeled data per class27    Keyword Arguments:28        unlabeled_per_class {int} -- Number of unlabeled data per class (default: {5000})29        max_seq_len {int} -- Maximum sequence length (default: {256})30        model {str} -- Model name (default: {'bert-base-uncased'})31        train_aug {bool} -- Whether performing augmentation on labeled training set (default: {False})32    """33    # Load the tokenizer for bert34    tokenizer = BertTokenizer.from_pretrained(model)35    train_df = pd.read_csv(data_path+'train.csv', header=None)36    test_df = pd.read_csv(data_path+'test.csv', header=None)37    # Here we only use the bodies and removed titles to do the classifications38    train_labels = np.array([v-1 for v in train_df[0]])39    train_text = np.array([v for v in train_df[2]])40    test_labels = np.array([u-1 for u in test_df[0]])41    test_text = np.array([v for v in test_df[2]])42    n_labels = max(test_labels) + 143    # Split the labeled training set, unlabeled training set, development set44    train_labeled_idxs, train_unlabeled_idxs, val_idxs = train_val_split(45        train_labels, n_labeled_per_class, unlabeled_per_class, n_labels)46    # Build the dataset class for each set47    train_labeled_dataset = loader_labeled(48        train_text[train_labeled_idxs], train_labels[train_labeled_idxs], tokenizer, max_seq_len, train_aug)49    train_unlabeled_dataset = loader_unlabeled(50        train_text[train_unlabeled_idxs], train_unlabeled_idxs, tokenizer, max_seq_len, Translator(data_path))51    val_dataset = loader_labeled(52        train_text[val_idxs], train_labels[val_idxs], tokenizer, max_seq_len)53    test_dataset = loader_labeled(54        test_text, test_labels, tokenizer, max_seq_len)55    print("#Labeled: {}, Unlabeled {}, Val {}, Test {}".format(len(56        train_labeled_idxs), len(train_unlabeled_idxs), len(val_idxs), len(test_labels)))57    return train_labeled_dataset, train_unlabeled_dataset, val_dataset, test_dataset, n_labels58def train_val_split(labels, n_labeled_per_class, unlabeled_per_class, n_labels, seed=0):59    """Split the original training set into labeled training set, unlabeled training set, development set60    Arguments:61        labels {list} -- List of labeles for original training set62        n_labeled_per_class {int} -- Number of labeled data per class63        unlabeled_per_class {int} -- Number of unlabeled data per class64        n_labels {int} -- The number of classes65    Keyword Arguments:66        seed {int} -- [random seed of np.shuffle] (default: {0})67    Returns:68        [list] -- idx for labeled training set, unlabeled training set, development set69    """70    np.random.seed(seed)71    labels = np.array(labels)72    train_labeled_idxs = []73    train_unlabeled_idxs = []74    val_idxs = []75    for i in range(n_labels):76        idxs = np.where(labels == i)[0]77        np.random.shuffle(idxs)78        if n_labels == 2:79            # IMDB80            train_pool = np.concatenate((idxs[:500], idxs[5500:-2000]))81            train_labeled_idxs.extend(train_pool[:n_labeled_per_class])82            train_unlabeled_idxs.extend(83                idxs[500: 500 + 5000])84            val_idxs.extend(idxs[-2000:])85        elif n_labels == 10:86            # DBPedia87            train_pool = np.concatenate((idxs[:500], idxs[10500:-2000]))88            train_labeled_idxs.extend(train_pool[:n_labeled_per_class])89            train_unlabeled_idxs.extend(90                idxs[500: 500 + unlabeled_per_class])91            val_idxs.extend(idxs[-2000:])92        else:93            # Yahoo/AG News94            train_pool = np.concatenate((idxs[:500], idxs[5500:-2000]))95            train_labeled_idxs.extend(train_pool[:n_labeled_per_class])96            train_unlabeled_idxs.extend(97                idxs[500: 500 + 5000])98            val_idxs.extend(idxs[-2000:])99    np.random.shuffle(train_labeled_idxs)100    np.random.shuffle(train_unlabeled_idxs)101    np.random.shuffle(val_idxs)102    return train_labeled_idxs, train_unlabeled_idxs, val_idxs103class loader_labeled(Dataset):104    # Data loader for labeled data105    def __init__(self, dataset_text, dataset_label, tokenizer, max_seq_len, aug=False):106        self.tokenizer = tokenizer107        self.text = dataset_text108        self.labels = dataset_label109        self.max_seq_len = max_seq_len110        self.aug = aug111        self.trans_dist = {}112        if aug:113            print('Aug train data by back translation of German')114            self.en2de = torch.hub.load(115                'pytorch/fairseq', 'transformer.wmt19.en-de.single_model', tokenizer='moses', bpe='fastbpe')116            self.de2en = torch.hub.load(117                'pytorch/fairseq', 'transformer.wmt19.de-en.single_model', tokenizer='moses', bpe='fastbpe')118    def __len__(self):119        return len(self.labels)120    def augment(self, text):121        if text not in self.trans_dist:122            self.trans_dist[text] = self.de2en.translate(self.en2de.translate(123                text,  sampling=True, temperature=0.9),  sampling=True, temperature=0.9)124        return self.trans_dist[text]125    def get_tokenized(self, text):126        tokens = self.tokenizer.tokenize(text)127        if len(tokens) > self.max_seq_len:128            tokens = tokens[:self.max_seq_len]129        length = len(tokens)130        encode_result = self.tokenizer.convert_tokens_to_ids(tokens)131        padding = [0] * (self.max_seq_len - len(encode_result))132        encode_result += padding133        return encode_result, length134    def __getitem__(self, idx):135        if self.aug:136            text = self.text[idx]137            text_aug = self.augment(text)138            text_result, text_length = self.get_tokenized(text)139            text_result2, text_length2 = self.get_tokenized(text_aug)140            return ((torch.tensor(text_result), torch.tensor(text_result2)), (self.labels[idx], self.labels[idx]), (text_length, text_length2))141        else:142            text = self.text[idx]143            tokens = self.tokenizer.tokenize(text)144            if len(tokens) > self.max_seq_len:145                tokens = tokens[:self.max_seq_len]146            length = len(tokens)147            encode_result = self.tokenizer.convert_tokens_to_ids(tokens)148            padding = [0] * (self.max_seq_len - len(encode_result))149            encode_result += padding150            return (torch.tensor(encode_result), self.labels[idx], length)151class loader_unlabeled(Dataset):152    # Data loader for unlabeled data153    def __init__(self, dataset_text, unlabeled_idxs, tokenizer, max_seq_len, aug=None):154        self.tokenizer = tokenizer155        self.text = dataset_text156        self.ids = unlabeled_idxs157        self.aug = aug158        self.max_seq_len = max_seq_len159    def __len__(self):160        return len(self.text)161    def get_tokenized(self, text):162        tokens = self.tokenizer.tokenize(text)163        if len(tokens) > self.max_seq_len:164            tokens = tokens[:self.max_seq_len]165        length = len(tokens)166        encode_result = self.tokenizer.convert_tokens_to_ids(tokens)167        padding = [0] * (self.max_seq_len - len(encode_result))168        encode_result += padding169        return encode_result, length170    def __getitem__(self, idx):171        if self.aug is not None:172            u, v, ori = self.aug(self.text[idx], self.ids[idx])173            encode_result_u, length_u = self.get_tokenized(u)174            encode_result_v, length_v = self.get_tokenized(v)175            encode_result_ori, length_ori = self.get_tokenized(ori)176            return ((torch.tensor(encode_result_u), torch.tensor(encode_result_v), torch.tensor(encode_result_ori)), (length_u, length_v, length_ori))177        else:178            text = self.text[idx]179            encode_result, length = self.get_tokenized(text)...test_encode.py
Source:test_encode.py  
1from testplates import (2    struct,3    init,4    field,5    attach_codec,6    AnyInstance,7    encode,8    create_codec,9    TestplatesError,10    InvalidStructureError,11    NoCodecAvailableError,12    InaccessibleCodecError,13    AmbiguousCodecChoiceError,14)15from resultful import (16    success,17    failure,18    unwrap_success,19    unwrap_failure,20    Result,21)22from hypothesis import (23    given,24    strategies as st,25)26from .utils import (27    unreachable,28)29# noinspection PyTypeChecker30@given(data=st.binary())31def test_encode(data: bytes) -> None:32    metadata_object = object()33    # noinspection PyUnusedLocal34    def encode_function(35        metadata: object,36        instance: AnyInstance,37    ) -> Result[bytes, TestplatesError]:38        assert metadata is metadata_object39        return success(data)40    codec = create_codec(encode_function, unreachable)41    @struct42    class Person:43        pass44    attach_codec(Person, codec=codec, metadata=metadata_object)45    assert (person_result := init(Person))46    person = unwrap_success(person_result)47    assert (encode_result := encode(person))48    assert unwrap_success(encode_result) == data49# noinspection PyTypeChecker50@given(data=st.binary())51def test_encode_without_metadata(data: bytes) -> None:52    # noinspection PyUnusedLocal53    def encode_function(54        metadata: None,55        instance: AnyInstance,56    ) -> Result[bytes, TestplatesError]:57        assert metadata is None58        return success(data)59    codec = create_codec(encode_function, unreachable)60    @struct61    class Person:62        pass63    attach_codec(Person, codec=codec)64    assert (person_result := init(Person))65    person = unwrap_success(person_result)66    assert (encode_result := encode(person))67    assert unwrap_success(encode_result) == data68# noinspection PyTypeChecker69@given(data=st.binary())70def test_encode_with_using(data: bytes) -> None:71    # noinspection PyUnusedLocal72    def encode_function(73        metadata: None,74        instance: AnyInstance,75    ) -> Result[bytes, TestplatesError]:76        return success(data)77    primary = create_codec(encode_function, unreachable)78    secondary = create_codec(unreachable, unreachable)79    @struct80    class Person:81        pass82    attach_codec(Person, codec=primary)83    attach_codec(Person, codec=secondary)84    assert (person_result := init(Person))85    person = unwrap_success(person_result)86    assert (encode_result := encode(person, using=primary))87    assert unwrap_success(encode_result) is data88# noinspection PyTypeChecker89def test_encode_failure() -> None:90    error = TestplatesError()91    # noinspection PyUnusedLocal92    def encode_function(93        metadata: None,94        instance: AnyInstance,95    ) -> Result[bytes, TestplatesError]:96        return failure(error)97    codec = create_codec(encode_function, unreachable)98    @struct99    class Person:100        pass101    attach_codec(Person, codec=codec)102    assert (person_result := init(Person))103    person = unwrap_success(person_result)104    assert not (encode_result := encode(person))105    assert unwrap_failure(encode_result) is error106# noinspection PyTypeChecker107@given(name=st.text())108def test_encode_failure_invalid_structure(name: str) -> None:109    field_error = TestplatesError()110    codec = create_codec(unreachable, unreachable)111    @struct112    class Person:113        name = field(failure(field_error))114    attach_codec(Person, codec=codec)115    person = Person(name=name)116    assert not (encode_result := encode(person))117    error = unwrap_failure(encode_result)118    assert isinstance(error, InvalidStructureError)119    assert error.errors == [field_error]120# noinspection PyTypeChecker121def test_encode_failure_no_codec_available_error() -> None:122    @struct123    class Person:124        pass125    assert (person_result := init(Person))126    person = unwrap_success(person_result)127    assert not (encode_result := encode(person))128    error = unwrap_failure(encode_result)129    assert isinstance(error, NoCodecAvailableError)130    assert error.structure_type == Person131# noinspection PyTypeChecker132def test_encode_failure_inaccessible_codec_error() -> None:133    first = create_codec(unreachable, unreachable)134    second = create_codec(unreachable, unreachable)135    @struct136    class Person:137        pass138    attach_codec(Person, codec=first)139    assert (person_result := init(Person))140    person = unwrap_success(person_result)141    assert not (encode_result := encode(person, using=second))142    error = unwrap_failure(encode_result)143    assert isinstance(error, InaccessibleCodecError)144    assert error.structure_type == Person145    assert error.codecs == [first]146    assert error.using == second147# noinspection PyTypeChecker148def test_encode_failure_ambiguous_codec_choice_error() -> None:149    primary = create_codec(unreachable, unreachable)150    secondary = create_codec(unreachable, unreachable)151    @struct152    class Person:153        pass154    attach_codec(Person, codec=primary)155    attach_codec(Person, codec=secondary)156    assert (person_result := init(Person))157    person = unwrap_success(person_result)158    assert not (encode_result := encode(person))159    error = unwrap_failure(encode_result)160    assert isinstance(error, AmbiguousCodecChoiceError)161    assert error.structure_type == Person...shumeitools.py
Source:shumeitools.py  
1# -*- coding: utf-8 -*-2import json3import time4import urllib25PIC_CHANNEL = [6    (1,"IMAGE", "å¾åï¼é»è®¤å¼ï¼"),7    (2,"HEAD_IMG", "ç¨æ·å¤´å"),8    (3,"CHAT_IMG", "è天å¾ç"),9]10TEXT_CHANNEL = [11    (1, "NICKNAME"),12    (2, "MESSAGE"),13    (3, "PROFILE")14]15ACCESS_KEY = "IBIIH3VkFvqAEmGC3YpJ"16TEXT_TYPE = "SOCIAL"17PIC_TYPE = "AD_PORN"18def shumei_image_detect(pic_url, timeout,user_id,channel,sex,phone):19    channel_desc = "IMAGE"20    if channel==1:21        channel_desc = "IMAGE"22    elif channel == 2:23        channel_desc = "HEAD_IMGE"24    elif channel == 3:25        channel_desc = "CHAT_IMG"26    data = {"img": pic_url, "tokenId": str(user_id), "channel": channel_desc}27    playload = {"accessKey": ACCESS_KEY, "type": PIC_TYPE, "data": data, "sex":sex, "age":0, "phone":phone}28    body = json.dumps(playload)29    shumei_url = "http://api.fengkongcloud.com/v2/saas/anti_fraud/img"30    start_time = int(time.time()*1000)31    request = urllib2.Request(shumei_url, body)32    shumei_result = urllib2.urlopen(request, timeout=timeout).read()33    end_time = int(time.time()*1000)34    duration = end_time - start_time35    encode_result = json.loads(shumei_result)36    print encode_result37    if (encode_result["code"] == 1100):38        risk_level = encode_result["riskLevel"]39        detail = encode_result["detail"]40    elif (encode_result["code"] == 1902):41        # åæ°ä¸åæ³42        pass43    elif (encode_result["code"] == 1903):44        # æå¡å¤±è´¥45        pass46    elif (encode_result["code"] == 9100):47        # ä½é¢ä¸è¶³48        pass49    elif (encode_result["code"] == 9101):50        # æ æéæä½51        pass52    else:53        # 䏿é误54        pass55    return encode_result, duration56def shumei_text_spam(text, timeout, user_id, channel, nickname,phone,ip):57    channel_desc = channel58    data = {"text": text, "tokenId": str(user_id), "channel": channel_desc, "nickname": nickname, "phone":phone, "ip":ip, }59    playload = {"accessKey": ACCESS_KEY, "type": TEXT_TYPE, "data": data}60    body = json.dumps(playload)61    shumei_url = "http://api.fengkongcloud.com/v2/saas/anti_fraud/text"62    request = urllib2.Request(shumei_url, body)63    start_time = int(time.time()*1000)64    shumei_result = urllib2.urlopen(request, timeout=timeout).read()65    end_time = int(time.time()*1000)66    duration = end_time - start_time67    encode_result = json.loads(shumei_result)68    #shumei_result = requests.post(shumei_url, data=body, timeout=timeout)69    #encode_result = json.loads(shumei_result.text)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
