Best Python code snippet using localstack_python
environment.py
Source:environment.py  
1# Authored by elia on 22/10/2020 2# Feature: #Enter feature name here3# Enter feature description here4# Scenario: # Enter scenario name here5"""6"""7import gym8from gym import spaces9import numpy as np10from main.env.data import *11from main.util.binary_ops import *12from main.util.java_utils import dlf_analyse13from main.util.model_utils import *14np.seterr(all='raise')15class Environment(gym.Env):16    """Custom Environment that follows gym interface"""17    metadata = {'render.modes': ['human']}18    """19        Arguments:20            grid_name: string (name of the electrical grid)21    22            action_type: string ("continous" or "discrete")23    """24    def __init__(self, grid_name="bus33", action_type="discrete", load_shedding=100):25        super(Environment, self).__init__()26        self.grid_name = grid_name27        self.action_type = action_type28        # self.load_shedding = load_shedding29        self.load_data, self.line_data = get_data_from_csv(grid_name)30        self.max_capacity, _ = get_mva_kva(self.grid_name)  # MVa to KVa31        self.max_capacity = self.max_capacity * 100032        self.n_nodes = len(self.line_data) + 133        self.current_reward = self.calculate_reward(self.load_data[:, 3], self.load_data[:, 1], self.load_data[:, 4], self.load_data)34        self.reward_range = spaces.Box(low=0, high=1000, shape=(1,))  # spaces.Box(np.array(0), np.array(100))35        high = np.array([10000] * self.n_nodes)36        self.observation_space = spaces.Box(-high, high)37        if action_type.lower() == "discrete":38            self.action_space = spaces.Discrete(self.n_nodes + 1)39        elif action_type.lower() == "continous":40            # self.action_space = spaces.Box(low=0,high=1, shape=(self.n_nodes, 1), dtype=np.int)41            self.action_space = spaces.MultiBinary(self.n_nodes)42        else:43            raise Exception("Action type: {} not implemented. Use 'discrete' or 'continous'")44        self.num_actions = 045        self.done = True46    def get_remaining_power(self):47        power_assigned = np.sum(self.load_data[:, 1] * self.load_data[:, 3])48        return self.max_capacity - power_assigned49    def calculate_reward(self, status, load, priority, load_data):50        # calculate dlf before returning reward51        load_data_temp = load_data.copy()52        load_data_temp[:, 3] = status53        power_values_from_dlf, _ = dlf_analyse(self.line_data, load_data_temp, grid_name=self.grid_name)54        power_values_from_dlf = np.array(power_values_from_dlf)55        if not (power_values_from_dlf.min() > 0.9 and power_values_from_dlf.max() < 1.1):56            return -np.sum(status)57        return np.sum(load * status * np.square(priority))58    def step(self, action):59        # Execute one time step within the environment60        print("ACTION: {}".format(action))61        if self.action_type == "continous":62            action = np.array(action)63            action[0] = 164        obs = self.get_observation(action)65        reward = self.reward(action)66        print("REWARD {}".format(reward))67        load_data_copy = self.load_data.copy()68        load_data_copy[:, 3] = action69        power_values_from_dlf, _ = dlf_analyse(self.line_data, load_data_copy, grid_name=self.grid_name)70        power_values_from_dlf = np.array(power_values_from_dlf)71        load_data_1, line_data = get_data_from_csv(self.grid_name)72        restored_load = np.sum(load_data_1[:, 1][(load_data_copy[:, 3] == 1)]) / (np.sum(load_data_1[:, 1]) + 0.001 ) * 10073        self.done = True74        reward=reward if self.action_type == 'discrete' else reward[0]75        return obs, reward, self.done, {"min" : power_values_from_dlf.min(),"max" : power_values_from_dlf.max(), "load": restored_load}76    def reset(self):77        print("**** EPISODE STARTS ...\n")78        if self.action_type == "continous":79            # Reset the state of the environment to an initial state80            status = self.load_data[:, 3]81            self.load_data, self.line_data = get_data_from_csv(self.grid_name)82            self.load_data[:, 3] = status83        else:84            # Reset the state of the environment to an initial state85            status = self.load_data[:, 3]86            self.load_data, self.line_data = get_data_from_csv(self.grid_name)87            # print("Load data {}".format(self.load_data[:, 1]))88            self.load_data[:, 3] = status89        self.num_actions = 090        self.done = False91        print("OBSERVATIONS: {}".format(self.current_state()))92        return self.current_state()93    def get_observation(self, action=np.inf):94        if self.action_type == "discrete":95            if action == np.inf:96                return self.current_state()97            else:98                if action <= self.n_nodes:99                    self.act_from_num(action=action)100                    print(action)101                else:102                    action_str = get_bin_str_with_max_count(action, self.n_nodes)103                    self.act(action_str)104                    print(action_str)105                # print("CURRENT STATE: " + str(self.load_data[:, 3]))106                return self.current_state()107        elif self.action_type == "continous":108            if not isinstance(action, np.ndarray):109                if action == np.inf:110                    return self.current_state()111            print("ACTION REWARD {}".format(112                self.calculate_reward(action, self.load_data[:, 1], self.load_data[:, 4], self.load_data)))113            print("OBSERVATION REWARD {}".format(self.current_reward))114            print("PRIORITY: " + str(self.load_data[:, 4]))115            if self.calculate_reward(np.array(action), self.load_data[:, 1], self.load_data[:, 4], self.load_data) > self.current_reward:116                self.load_data[:, 3] = np.array(action)117                return self.load_data[:, 3]118            else:119                return self.load_data[:, 3]120    def act_from_num(self, action):121        if action == 0:122            pass123        elif self.load_data[:, 3][action - 1] == 0:124            self.load_data[:, 3][action - 1] = 1125        else:126            self.load_data[:, 3][action - 1] = 0127        self.load_data[:, 3][0] = 1128    def act(self, action_str):129        for action in range(len(action_str)):130            print(action_str)131            self.load_data[:, 3][action] = int(action_str[action])132        self.load_data[:, 3][0] = 1133        return134    def current_state(self):135        return self.load_data[:, 3]136    def restored_load_percentage(self):137        load_data, line_data = get_data_from_csv(self.grid_name)138        return np.sum(load_data[:, 1][(self.load_data[:, 3] == 1)] / np.sum(load_data[:, 1])) * 100139    def reward(self, action):140        print("CURRENT STATE: " + str(self.load_data[:, 3]))141        print("RESTORED LOAD: {}%".format(self.restored_load_percentage()))142        power_values_from_dlf, _ = dlf_analyse(self.line_data, self.load_data, grid_name=self.grid_name)143        power_values_from_dlf = np.array(power_values_from_dlf)144        # print(power_values_from_dlf)145        print("MIN VOL: {}".format(power_values_from_dlf.min()))146        print("MAX VOL: {}".format(power_values_from_dlf.max()))147        if self.action_type == "continous":148            if self.calculate_reward(np.array(action), self.load_data[:, 1],149                                     self.load_data[:, 4], self.load_data) <= self.current_reward:150                return self.calculate_reward(np.array(action), self.load_data[:, 1],151                                             self.load_data[:, 4], self.load_data) - self.current_reward152        status_reward = self.calculate_reward(self.load_data[:, 3], self.load_data[:, 1], self.load_data[:,153                                                                                          4], self.load_data) # np.sum(self.load_data[:, 3] * np.square(self.load_data[:, 4]))154        # print("STATUS REWARD: {}".format(status_reward))155        self.current_reward = status_reward  # divide by num_actions which is the number of episodes156        return status_reward157    def power_assigned(self):...ner_test.py
Source:ner_test.py  
...13    s = {'read_model':True,'seed':345, 'epoch':20, 'lr':0.1, 'decay':0.95, 'wsize':5, 'hnum':100 , 'dnum':100, 'ynum':17, 'wnum':6336, 'L2': 0.000001,14         'f0':33716, 'f1':34238, 'f2':34189, 'f3':33768, 'f4':42828, 'fsize':1, 'kalpha':0.2}15    16    print 'load train data'17    train_word = load_data("data/train/train.word.txt")    18    train_label = load_data("data/train/train.label.txt")19    train_f0 = load_data('data/train/trainfeature0.txt')20    train_f1 = load_data('data/train/trainfeature1.txt')21    train_f2 = load_data('data/train/trainfeature2.txt')22    train_f3 = load_data('data/train/trainfeature3.txt')23    train_f4 = load_data('data/train/trainfeature4.txt')24    print 'load sighan data'25    sighan_word = load_data("data/train_sighan/trainsighan.word.txt")    26    sighan_label = load_data("data/train_sighan/trainsighan.label.txt")27    sighan_f0 = load_data('data/train_sighan/trainsighanfeature0.txt')28    sighan_f1 = load_data('data/train_sighan/trainsighanfeature1.txt')29    sighan_f2 = load_data('data/train_sighan/trainsighanfeature2.txt')30    sighan_f3 = load_data('data/train_sighan/trainsighanfeature3.txt')31    sighan_f4 = load_data('data/train_sighan/trainsighanfeature4.txt')32    sighan_simi = load_emb('data/train_sighan/cos.txt')#cross / cos / poly / gaussian33    print 'load dev data'34    dev_word = load_data("data/test/test.word.txt")35    dev_label = load_data("data/test/test.label.txt")36    dev_f0 = load_data('data/test/testfeature0.txt')37    dev_f1 = load_data('data/test/testfeature1.txt')38    dev_f2 = load_data('data/test/testfeature2.txt')39    dev_f3 = load_data('data/test/testfeature3.txt')40    dev_f4 = load_data('data/test/testfeature4.txt')41    print 'load test data'42    test_word = load_data("data/test/test.word.txt")43    test_label = load_data("data/test/test.label.txt")44    test_f0 = load_data('data/test/testfeature0.txt')45    test_f1 = load_data('data/test/testfeature1.txt')46    test_f2 = load_data('data/test/testfeature2.txt')47    test_f3 = load_data('data/test/testfeature3.txt')48    test_f4 = load_data('data/test/testfeature4.txt')49    sys.stdout.flush()50    51    print 'load baseline predict data'52    baseline_label = load_data("data/semi_test_pred.txt")53    for i in range(len(baseline_label)):54        for j in range(len(baseline_label[i])):55            if int(baseline_label[i][j])<=8:56                baseline_label[i][j]=057            else:58                baseline_label[i][j]=int(baseline_label[i][j])59    60    nsentences = len(train_word)61    np.random.seed(s['seed'])62    random.seed(s['seed'])63    rnn = model(read_model=s['read_model'],hnum = s['hnum'], ynum = s['ynum'], wnum = s['wnum'], dnum = s['dnum'], wsize = s['wsize'], fsize = s['fsize'], L2 = s['L2'],64                fnum0 = s['f0'], fnum1 = s['f1'], fnum2 = s['f2'], fnum3 = s['f3'], fnum4 = s['f4'], kalpha=s['kalpha'])65    #rnn.emb = load_emb("data/embeddingsall")66    s['cur_lr'] = s['lr']...extract_peaks.py
Source:extract_peaks.py  
1import pandas as pd2import numpy as np3import datetime4from plotly import graph_objs as go5import plotly.express as px6def get_peak_days(load_data,7                    peak_level=2,8                    weekday_only=False,9                    peak_hours_only=False,10                    peak_hours=(7,18)):11    """12    load_data: input data to be analyzed,13    peak_level: amount of peaks to be considered for each month - values between 1 to 10,14    weekday_only: consider only weekdays,15    peak_hours_only: consider only peak hours,16    peak_hours: tuple with start and end of peak hours17    """18    if weekday_only:19        load_data = load_data[load_data.index.weekday <= 4]20    if peak_hours_only:21        load_data = load_data[(load_data.index.hour <= peak_hours[1]) &\22                                (load_data.index.hour >= peak_hours[0])]23    monthly_peak_days = load_data.groupby([load_data.index.month,24                                            load_data.index.year])['net_load_after_pv'].nlargest(int(peak_level*10))25    monthly_peak_days.index = monthly_peak_days.index.droplevel(0).droplevel(0)26    monthly_peak_days = monthly_peak_days.groupby([monthly_peak_days.index.year,27                                                    monthly_peak_days.index.month,28                                                    monthly_peak_days.index.date]).max()29    return list(monthly_peak_days.index.levels[2])30def get_peak_statistics(load_data,31                        peak_days=False,32                        peak_level=3,33                        weekday_only=False,34                        peak_hours_only=False,35                        peak_hours=(7,18)):36    """37    load_data: input data to be analyzed,38    peak_days: consider only peak days identified from get_peak_days()39    peak_level: amount of peaks to be considered for each month - values between 1 to 10,40    weekday_only: consider only weekdays,41    peak_hours_only: consider only peak hours,42    peak_hours: tuple with start and end of peak hours43    """44    title = "all days"45    if weekday_only:46        load_data = load_data[load_data.index.weekday <= 4]47    if peak_hours_only:48        load_data = load_data[(load_data.index.hour <= peak_hours[1]) &\49                                (load_data.index.hour >= peak_hours[0])]50    if peak_days:51        title = "selected peak days"52        load_data = load_data.loc[np.isin(load_data.index.date,53                                            get_peak_days(load_data=load_data, peak_level=peak_level,54                                                           weekday_only=weekday_only,55                                                           peak_hours_only=peak_hours_only,56                                                           peak_hours=peak_hours))]57    peak_time = load_data['net_load_after_pv'].groupby(load_data.index.date).idxmax()58    peak_hours = peak_time.dt.hour59    peak_hours = pd.DataFrame(peak_hours.values, index = peak_hours.index)60    peak_hours['peak'] = 161    peak_hours.columns = ['hour', 'peak']62    peak_distribution = peak_hours.groupby('hour').sum()63    fig = px.bar(peak_distribution, x=peak_distribution.index, y='peak')64    fig.update_xaxes(range=[0, 24])65    fig.update_layout(title=f"Peak Distribution for {title}")...process_csv_util.py
Source:process_csv_util.py  
1import calendar2import holidays3def add_year(load_data):4    load_data['Year'] =   load_data.apply(lambda row: row['Date'].year, axis=1)5    return load_data6def add_month(load_data):7    load_data['Month'] =   load_data.apply(lambda row: row['Date'].month, axis=1)8    return load_data9def add_season(load_data):10    load_data['Season'] = load_data.apply(lambda row: calculate_season(row['Date']), axis=1)11    return load_data12def add_day(load_data):13    load_data['Day'] = load_data.apply(lambda row: row['Date'].weekday(), axis=1)14    return load_data15def add_day_of_year(load_data):16    load_data['Dayofyear'] = load_data.apply(lambda row: row['Date'].dayofyear, axis=1)17    return load_data18def add_is_weekend(load_data):19    load_data['Is_Weekend'] = load_data.apply(lambda row: row['Date'].isoweekday() > 5, axis=1)20    return load_data21def add_holiday(load_data, country, prov=None, state=None):22    load_data['Holiday'] = load_data.apply(lambda row: calculate_holiday(row['Date'], country, prov, state), axis=1)23    return load_data24def calculate_holiday(date, country, prov, state):25    holiday_list = getattr(holidays, country)(prov=prov, state=state)26    holiday = holiday_list.get(date)27    if holiday == None:28        return 'Non-Holiday'29    return holiday30def calculate_season(date):31    spring = ['March', 'April', 'May']32    summer = ['June', 'July','August']33    autumn = ['September', 'October', 'November']34    month = calendar.month_name[date.month]35    36    if (month in spring):37        return 138    elif (month in summer):39        return 240    elif (month in autumn):41        return 3...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
