Best Python code snippet using locust
data_utils.py
Source:data_utils.py  
1# This code is adapted from https://github.com/tensorflow/models/tree/master/neural_gpu2# Licensed under the Apache License, Version 2.0 (the "License");3# you may not use this file except in compliance with the License.4# You may obtain a copy of the License at5#6#     http://www.apache.org/licenses/LICENSE-2.07#8# Unless required by applicable law or agreed to in writing, software9# distributed under the License is distributed on an "AS IS" BASIS,10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.11# See the License for the specific language governing permissions and12# limitations under the License.13# ==============================================================================14"""Code for working with the data"""15import random16import sys17import time18import numpy as np19from tensorflow.python.platform import gfile20import config as cnf21import task as tasks22from language.lambada import LambadaTask23from language.musicnet import Musicnet24from language.utils import LanguageTask25def find_data_task(task: str) -> LanguageTask:26    if task == "lambada":27        return LambadaTask()28    elif task == "musicnet":29        return Musicnet()30    else:31        raise NotImplementedError("Task '{task}' not supported".format(task=task))32def get_prev_indices(n_bits):33    length = 1 << n_bits34    ptr = [-1] * length35    for k in range(1, n_bits):36        ofs = ptr.index(-1)37        step = 1 << k38        prev = -239        while ofs < length:40            assert ptr[ofs] == -141            ptr[ofs] = prev42            prev = ofs43            ofs += step44    return ptr45train_counters = np.zeros(cnf.bin_max_len, dtype=np.int32)46test_counters = np.zeros(cnf.bin_max_len, dtype=np.int32)47def reset_counters():48    global train_counters49    global test_counters50    train_counters = np.zeros(cnf.bin_max_len, dtype=np.int32)51    test_counters = np.zeros(cnf.bin_max_len, dtype=np.int32)52reset_counters()53def pad(length):54    for b in cnf.bins:55        if b >= length: return b56    return cnf.forward_max57train_set = {}58test_set = {}59def init(max_length=cnf.bin_max_len):60    train_set.clear()61    test_set.clear()62    for some_task in cnf.all_tasks:63        train_set[some_task] = []64        test_set[some_task] = []65        for all_max_len in range(max_length):66            train_set[some_task].append([])67            test_set[some_task].append([])68def collect_bins():69    max_length = cnf.bins[-1]70    for some_task in cnf.all_tasks:71        for L in range(max_length):72            bin_length = pad(L)73            if bin_length != L:74                cur_train = train_set[some_task]75                cur_test = test_set[some_task]76                cur_train[bin_length] += cur_train[L]77                cur_test[bin_length] += cur_test[L]78                cur_train[L] = []79                cur_test[L] = []80    # add some shorter instances to train for padding81    for some_task in cnf.all_tasks:82        for ind in range(1, len(cnf.bins)):83            small_count = len(train_set[some_task][cnf.bins[ind]]) // 20  # 5% shorter instances84            for itemNr in range(small_count):85                smaller_bin = cnf.bins[random.randint(0, ind - 1)]86                if len(train_set[some_task][smaller_bin]) > 0:87                    item = random.choice(train_set[some_task][smaller_bin])88                    train_set[some_task][cnf.bins[ind]].append(item)89    # shuffle randomly90    for some_task in cnf.all_tasks:91        for L in cnf.bins:92            random.shuffle(train_set[some_task][L])93            if not cnf.musicnet_visualise:94                random.shuffle(test_set[some_task][L])95def init_data(task, length, nbr_cases, nclass):96    init_data_1(task, length, nbr_cases, nclass, train_set)97    init_data_1(task, length, nbr_cases, nclass, test_set)98"""Data initialization."""99def init_data_1(task, length, nbr_cases, nclass, cur_set):100    cur_set[task][length] = []101    L = length102    cur_time = time.time()103    total_time = 0.0104    input_set = set()105    case_count = 0106    trials = 0107    task_gen = tasks.select_task(task, nclass)108    while case_count < nbr_cases and trials < 20:109        total_time += time.time() - cur_time110        cur_time = time.time()111        if L > cnf.bin_max_len and case_count % 100 == 1:112            print_out("  avg gen time %.4f s" % (total_time / float(case_count)))113        i, t = task_gen.input_output_pair(L)114        if len(i) == 0: break115        i_tuple = tuple(i)116        if i_tuple not in input_set:117            input_set.add(i_tuple)118            cur_set[task][len(i)].append([i, t])119            case_count += 1120            trials = 0121        else:122            trials += 1123def get_batch(max_length, batch_size, do_train, task, offset=None, preset=None):124    """Get a batch of data, training or testing."""125    inputs = []126    targets = []127    length = max_length128    if preset is None:129        if do_train:130            cur_set = train_set[task]131            counters = train_counters132        else:133            cur_set = test_set[task]134            counters = test_counters135        while not cur_set[length]:136            length -= 1137            assert length, "Bin in length {len} is empty. Expected to contain values".format(len=max_length)138    for b in range(batch_size):139        if preset is None:140            cur_ind = counters[length]141            elem = cur_set[length][cur_ind]142            cur_ind += 1143            if cur_ind >= len(cur_set[length]):144                random.shuffle(cur_set[length])145                cur_ind = 0146            counters[length] = cur_ind147            if offset is not None and offset + b < len(cur_set[length]):148                elem = cur_set[length][offset + b]149        else:150            elem = preset151        inp, target = elem[0], elem[1]152        assert len(inp) <= length, "Input len {inp}; Length {length}".format(inp=inp, length=length)153        padded_input, padded_target = add_padding(inp, target, max_length)154        inputs.append(padded_input)155        targets.append(padded_target)156    new_input = inputs157    new_target = targets158    return new_input, new_target159def add_padding(inp: list, target: list, max_length: int):160    if cnf.disperse_padding:161        inp, target = disperse_padding(inp, max_length, target)162    pad_len_input = max_length - len(inp)163    pad_len_output = max_length - len(target)164    pad_len_before = 0165    if cnf.use_front_padding:166        pad_len_before = np.random.randint(min(pad_len_input, pad_len_output) + 1)167    pad_before = np.zeros([pad_len_before])168    padded_input = np.concatenate([pad_before, inp, np.zeros([pad_len_input - pad_len_before])])169    padded_target = np.concatenate([pad_before, target, np.zeros([pad_len_output - pad_len_before])])170    return padded_input, padded_target171def disperse_padding(inp, max_length, target):172    assert len(inp) == len(target)173    desired_length = np.random.randint(len(inp), max_length + 1)174    cur_symbol = 0175    res_in = []176    res_out = []177    for i in range(desired_length):178        remaining_symbols = len(inp) - cur_symbol179        if np.random.randint(desired_length - i) >= remaining_symbols:180            res_in.append(0)181            res_out.append(0)182        else:183            res_in.append(inp[cur_symbol])184            res_out.append(target[cur_symbol])185            cur_symbol += 1186    remaining_symbols = len(inp) - cur_symbol187    assert remaining_symbols == 0188    assert len(res_in) == desired_length189    assert len(res_out) == desired_length190    return res_in, res_out191def print_out(s, newline=True):192    """Print a message out and log it to file."""193    if cnf.log_filename:194        try:195            with gfile.GFile(cnf.log_filename, mode="a") as f:196                f.write(s + ("\n" if newline else ""))197        except:198            sys.stdout.write("Error appending to %s\n" % cnf.log_filename)199    sys.stdout.write(s + ("\n" if newline else ""))200    sys.stdout.flush()201def accuracy(inpt, output, target, batch_size, nprint):202    """Calculate output accuracy given target."""203    assert nprint < batch_size + 1204    def task_print(inp, output, target):205        print_len = len(inp)206        print_out("    i: " + " ".join([str(i) for i in inp]))207        print_out("    o: " +208                  " ".join([str(output[L]) for L in range(print_len)]))209        print_out("    t: " +210                  " ".join([str(target[L]) for L in range(print_len)]))211    decoded_target = target212    decoded_output = output213    total = 0214    errors = 0215    seq = [0 for _ in range(batch_size)]216    for L in range(len(decoded_output[0])):217        for b in range(batch_size):218            if decoded_target[b][L] > 0 or decoded_output[b][L] > 0:219                total += 1220                if decoded_output[b][L] != decoded_target[b][L]:221                    seq[b] = 1222                    errors += 1223    e = 0  # Previous error index224    for _ in range(min(nprint, sum(seq))):225        while seq[e] == 0:226            e += 1227        task_print(inpt[e], decoded_output[e], decoded_target[e])228        e += 1229    return errors, total, sum(seq)230def print_bin_usage():231    test_cases = 0232    train_cases = 0233    test_cases_bins = 0234    train_cases_bins = 0235    test = test_set[cnf.task]236    train = train_set[cnf.task]237    for i in range(cnf.bin_max_len):238        train_cases += len(train[i])239        test_cases += len(test[i])240        if i in cnf.bins:241            test_cases_bins += len(test[i])242            train_cases_bins += len(train[i])243    print("\n------------------- BIN USAGE INFO -------------------")244    print("Train cases total:", train_cases, "In bins:", train_cases_bins)245    print("Test cases total:", test_cases, "; In bins:", test_cases_bins)...test_testing_utils.py
Source:test_testing_utils.py  
...4def test_assert_num_tasks():5    a_mixin = AsyncTestMixin()6    a_mixin.sub_setUp()7    a_mixin.assertNumCrosstownTasks(0)8    def some_task():9        pass10    through_to_you = crosstown_traffic()11    through_to_you(some_task)12    a_mixin.assertNumCrosstownTasks(1)13def test_next_task():14    a_mixin = AsyncTestMixin()15    a_mixin.sub_setUp()16    a_mixin.assertNumCrosstownTasks(0)17    def some_task():18        pass19    through_to_you = crosstown_traffic()20    through_to_you(some_task)21    assert some_task is a_mixin.next_task()22def test_no_more_tasks():23    a_mixin = AsyncTestMixin()24    a_mixin.sub_setUp()25    a_mixin.assertNumCrosstownTasks(0)26    def some_task():27        pass28    through_to_you = crosstown_traffic()29    through_to_you(some_task)30    same_task = a_mixin.next_task()31    # That will be the only (and thus last) task.32    with pytest.raises(StopIteration):33        a_mixin.next_task()34def test_record_task_to_list():35    task_list = []36    crosstown_traffic.decorator = crosstownTaskListDecoratorFactory(task_list)37    assert len(task_list) == 038    @crosstown_traffic()39    def add_to_task_list():40        pass...prog46_Function_Caching.py
Source:prog46_Function_Caching.py  
...4from functools import lru_cache5import time67@lru_cache(maxsize=3)    # maxsize will store latest 3 call in cache memory8def some_task(n):9    # do some stuff here10    time.sleep(n) # here is some work which take lot of time11    return n121314if __name__ == "__main__":15    print("Doing some task.")16    some_task(3)17    print("Calling again ... Doing some task.")18    some_task(3)19    print("Calling once again ... Doing some task.")
...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
