Best Python code snippet using tempest_python
train_lio.py
Source:train_lio.py  
1"""Trains LIO agents on Escape Room game.23Three versions of LIO:41. LIO built on top of policy gradient52. LIO built on top of actor-critic63. Fully decentralized version of LIO on top of policy gradient7"""89from __future__ import division10from __future__ import print_function1112import argparse13import json14import os15import random1617import numpy as np18import tensorflow as tf1920from lio.alg import config_ipd_lio21from lio.alg import config_room_lio22from lio.alg import evaluate23from lio.env import ipd_wrapper24from lio.env import room_symmetric252627def train(config):2829    seed = config.main.seed30    np.random.seed(seed)31    random.seed(seed)32    tf.set_random_seed(seed)3334    dir_name = config.main.dir_name35    exp_name = config.main.exp_name36    log_path = os.path.join('..', 'results', exp_name, dir_name)37    model_name = config.main.model_name38    save_period = config.main.save_period3940    os.makedirs(log_path, exist_ok=True)4142    # Keep a record of parameters used for this run43    with open(os.path.join(log_path, 'config.json'), 'w') as f:44        json.dump(config, f, indent=4, sort_keys=True)4546    n_episodes = int(config.alg.n_episodes)47    n_eval = config.alg.n_eval48    period = config.alg.period4950    epsilon = config.lio.epsilon_start51    epsilon_step = (52        epsilon - config.lio.epsilon_end) / config.lio.epsilon_div5354    if config.env.name == 'er':55        env = room_symmetric.Env(config.env)56    elif config.env.name == 'ipd':57        env = ipd_wrapper.IPD(config.env)5859    if config.lio.decentralized:60        from lio_decentralized import LIO61    elif config.lio.use_actor_critic:62        from lio_ac import LIO63    else:64        from lio_agent import LIO6566    list_agents = []67    for agent_id in range(env.n_agents):68        if config.lio.decentralized:69            list_agent_id_opp = list(range(env.n_agents))70            del list_agent_id_opp[agent_id]71            list_agents.append(LIO(config.lio, env.l_obs, env.l_action,72                                   config.nn, 'agent_%d' % agent_id,73                                   config.env.r_multiplier, env.n_agents,74                                   agent_id, list_agent_id_opp))75        else:76            list_agents.append(LIO(config.lio, env.l_obs, env.l_action,77                                   config.nn, 'agent_%d' % agent_id,78                                   config.env.r_multiplier, env.n_agents,79                                   agent_id))        8081    for agent_id in range(env.n_agents):82        if config.lio.decentralized:83            list_agents[agent_id].create_opp_modeling_op()84        else:85            list_agents[agent_id].receive_list_of_agents(list_agents)86        list_agents[agent_id].create_policy_gradient_op()87        list_agents[agent_id].create_update_op()88        if config.lio.use_actor_critic:89            list_agents[agent_id].create_critic_train_op()9091    for agent_id in range(env.n_agents):92        list_agents[agent_id].create_reward_train_op()9394    # This handles the special case of two asymmetric agents,95    # one of which is the reward-giver and the other is the recipient96    if config.lio.asymmetric:97        assert config.env.n_agents == 298        for agent_id in range(env.n_agents):99            list_agents[agent_id].set_can_give(100                agent_id != config.lio.idx_recipient)101102    config_proto = tf.ConfigProto()103    if config.main.use_gpu:104        config_proto.device_count['GPU'] = 1105        config_proto.gpu_options.allow_growth = True106    else:107        config_proto.device_count['GPU'] = 0108    sess = tf.Session(config=config_proto)109    sess.run(tf.global_variables_initializer())110111    if config.lio.use_actor_critic:112        for agent in list_agents:113            sess.run(agent.list_initialize_v_ops)114115    list_agent_meas = []116    if config.env.name == 'er':117        list_suffix = ['reward_total', 'n_lever', 'n_door',118                       'received', 'given', 'r-lever', 'r-start', 'r-door']119    elif config.env.name == 'ipd':120        list_suffix = ['given', 'received', 'reward_env',121                       'reward_total']122    for agent_id in range(1, env.n_agents + 1):123        for suffix in list_suffix:124            list_agent_meas.append('A%d_%s' % (agent_id, suffix))125126    saver = tf.train.Saver(max_to_keep=config.main.max_to_keep)127128    header = 'episode,step_train,step,'129    header += ','.join(list_agent_meas)130    if config.env.name == 'er':131        header += ',steps_per_eps\n'132    else:133        header += '\n'134    with open(os.path.join(log_path, 'log.csv'), 'w') as f:135        f.write(header)    136137    step = 0138    step_train = 0139    for idx_episode in range(1, n_episodes + 1):140141        list_buffers = run_episode(sess, env, list_agents, epsilon,142                                   prime=False)143        step += len(list_buffers[0].obs)144145        if config.lio.decentralized:146            for idx, agent in enumerate(list_agents):147                agent.train_opp_model(sess, list_buffers,148                                      epsilon)149150        for idx, agent in enumerate(list_agents):151            agent.update(sess, list_buffers[idx], epsilon)152153        list_buffers_new = run_episode(sess, env, list_agents,154                                       epsilon, prime=True)155        step += len(list_buffers_new[0].obs)156157        for agent in list_agents:158            if agent.can_give:159                agent.train_reward(sess, list_buffers,160                                   list_buffers_new, epsilon)161162        for idx, agent in enumerate(list_agents):163            if config.lio.decentralized:164                agent.train_opp_model(sess, list_buffers_new,165                                      epsilon)166            else:167                agent.update_main(sess)168169        step_train += 1170171        if idx_episode % period == 0:172173            if config.env.name == 'er':174                (reward_total, n_move_lever, n_move_door, rewards_received,175                 rewards_given, steps_per_episode, r_lever,176                 r_start, r_door) = evaluate.test_room_symmetric(177                     n_eval, env, sess, list_agents)178                matrix_combined = np.stack([reward_total, n_move_lever, n_move_door,179                                            rewards_received, rewards_given,180                                            r_lever, r_start, r_door])181            elif config.env.name == 'ipd':182                given, received, reward_env, reward_total = evaluate.test_ipd(183                    n_eval, env, sess, list_agents)184                matrix_combined = np.stack([given, received, reward_env,185                                            reward_total])186187            s = '%d,%d,%d' % (idx_episode, step_train, step)188            for idx in range(env.n_agents):189                s += ','190                if config.env.name == 'er':191                    s += ('{:.3e},{:.3e},{:.3e},{:.3e},{:.3e},'192                          '{:.3e},{:.3e},{:.3e}').format(193                              *matrix_combined[:, idx])194                elif config.env.name == 'ipd':195                    s += '{:.3e},{:.3e},{:.3e},{:.3e}'.format(196                        *matrix_combined[:, idx])197            if config.env.name == 'er':198                s += ',%.2f\n' % steps_per_episode199            else:200                s += '\n'201            with open(os.path.join(log_path, 'log.csv'), 'a') as f:202                f.write(s)203204        if idx_episode % save_period == 0:205            saver.save(sess, os.path.join(log_path, '%s.%d'%(206                model_name, idx_episode)))207208        if epsilon > config.lio.epsilon_end:209            epsilon -= epsilon_step210211    saver.save(sess, os.path.join(log_path, model_name))212    213214def run_episode(sess, env, list_agents, epsilon, prime=False):215    list_buffers = [Buffer(env.n_agents) for _ in range(env.n_agents)]216    list_obs = env.reset()217    done = False218219    while not done:220        list_actions = []221        for agent in list_agents:222            action = agent.run_actor(list_obs[agent.agent_id], sess,223                                     epsilon, prime)224            list_actions.append(action)225226        list_rewards = []227        total_reward_given_to_each_agent = np.zeros(env.n_agents)228        for agent in list_agents:229            if agent.can_give:230                reward = agent.give_reward(list_obs[agent.agent_id],231                                           list_actions, sess)232            else:233                reward = np.zeros(env.n_agents)234            reward[agent.agent_id] = 0235            total_reward_given_to_each_agent += reward236            reward = np.delete(reward, agent.agent_id)237            list_rewards.append(reward)238239        if env.name == 'er':240            list_obs_next, env_rewards, done = env.step(list_actions, list_rewards)241        elif env.name == 'ipd':242            list_obs_next, env_rewards, done = env.step(list_actions)243244        for idx, buf in enumerate(list_buffers):245            buf.add([list_obs[idx], list_actions[idx], env_rewards[idx],246                     list_obs_next[idx], done])247            buf.add_r_from_others(total_reward_given_to_each_agent[idx])248            buf.add_action_all(list_actions)249            if list_agents[idx].include_cost_in_chain_rule:250                buf.add_r_given(np.sum(list_rewards[idx]))251252        list_obs = list_obs_next253254    return list_buffers255256257class Buffer(object):258259    def __init__(self, n_agents):260        self.n_agents = n_agents261        self.reset()262263    def reset(self):264        self.obs = []265        self.action = []266        self.reward = []267        self.obs_next = []268        self.done = []269        self.r_from_others = []270        self.r_given = []271        self.action_all = []272273    def add(self, transition):274        self.obs.append(transition[0])275        self.action.append(transition[1])276        self.reward.append(transition[2])277        self.obs_next.append(transition[3])278        self.done.append(transition[4])279280    def add_r_from_others(self, r):281        self.r_from_others.append(r)282283    def add_action_all(self, list_actions):284        self.action_all.append(list_actions)285286    def add_r_given(self, r):287        self.r_given.append(r)288289290if __name__ == '__main__':291292    parser = argparse.ArgumentParser()293    parser.add_argument('exp', type=str, choices=['er', 'ipd'])294    args = parser.parse_args()295296    if args.exp == 'er':297        config = config_room_lio.get_config()298    elif args.exp == 'ipd':299        config = config_ipd_lio.get_config()300
...maptoTF.py
Source:maptoTF.py  
...99--> if agent_less_type = 1, then types 1, 2, 3, 4, 5, ... will be deployed100--> if agent_less_type = 2, then types 1, 3, 5, ... will be deployed101--> if agent_less_type = 3, then types 1, 4, 7, ... will be deployed102'''103def get_list_agents(agent_grid, nb_types, final_resolution, agent_spaced=1, agent_less_type=1):104    result = []105    grid2d_length = len(agent_grid)106    id_point = 0107    id_agent = 0108    for x in range(grid2d_length):109        for y in range(grid2d_length):110            if agent_grid[x][y] and x % agent_spaced == 0 and y % agent_spaced == 0:111                for i in range(1, nb_types + 1, agent_less_type):112                    element_agent = []113                    point = []114                    point.append(x)115                    point.append(y)116                    element_agent.append(point)117                    element_agent.append(id_point)118                    element_agent.append(get_type_agent(i,final_resolution)[0])119                    element_agent.append(get_type_agent(i,final_resolution)[1])120                    element_agent.append(get_type_agent(i,final_resolution)[2])121                    element_agent.append(id_agent)122                    result.append(element_agent)123                    id_agent += 1124                id_point += 1125    return result126'''127population_grid is the population density grid128The function returns list_skills, where each skill is a 2-vector: (point, population)129'''130def get_list_skills(population_grid):131    result = []132    grid2d_length = len(population_grid)133    for x in range(grid2d_length):134        for y in range(grid2d_length):135            if population_grid[x][y] > 0:136                element_skill = []137                point = []138                point.append(x)139                point.append(y)140                element_skill.append(point)141                element_skill.append(population_grid[x][y])142                result.append(element_skill)143    return result144def get_id_skill(point, list_skills):145    list_skills_length = len(list_skills)146    for i in range(list_skills_length):147        if list_skills[i][0][0] == point[0] and list_skills[i][0][1] == point[1]:148            return i149    return -1150'''151agent_grid is a True-False grid, True iff an antenna can be deployed152population_grid is the population density grid153nb_types: the number of antenna types we want (not more than 7)154The function returns agents_to_skills, an array associating each agent id to a list of skill ids155'''156def get_agents_to_skills(agent_grid, population_grid, nb_types, list_agents, list_skills):157    result = []158    grid2d_length = len(agent_grid)159    nb_agents = len(list_agents)160    for id_agent in range(nb_agents):161        list_skills_for_id_agent = []162        neighbor_list = points_around(list_agents[id_agent][0], list_agents[id_agent][4], grid2d_length, True)163        for neighbor in neighbor_list:164            id_skill = get_id_skill(neighbor, list_skills)165            # here, if id_skill == -1 this means that point neighbor has no population166            if id_skill >= 0:167                list_skills_for_id_agent.append(id_skill)168        result.append(list_skills_for_id_agent)169    return result170'''171Based on agents_to_skills we make a new array filtering list_agents172'''173def remove_agents_with_no_skill_from_list_agents(list_agents, agents_to_skills):174    result = []175    new_id_agent = 0176    init_nb_agents = len(agents_to_skills)177    for i in range(init_nb_agents):178        if agents_to_skills[i]:179            list_agents[i][5] = new_id_agent180            new_id_agent += 1181            result.append(list_agents[i])182    return result183'''184Return a filtered list of agents_to_skills removing agents with no skill185'''186def remove_agents_with_no_skill_from_agents_to_skills(agents_to_skills):187    result = []188    init_nb_agents = len(agents_to_skills)189    for i in range(init_nb_agents):190        if agents_to_skills[i]:191            result.append(agents_to_skills[i])192    return result193##'''194##agent_grid is a True-False grid, True iff an antenna can be deployed195##population_grid is the population density grid196##nb_types: the number of antenna types we want (not more than 7)197##The function creates:198##list_agents: each agent is a 4-vector: (point, cost1, cost2, range)199##list_skills: each skill is a 2-vector: (point, population)200##agents_to_skills: an array associating each agent id to a list of skill ids201##'''202##def map_to_TF(agent_grid, population_grid, nb_types, list_agents, list_skills, agents_to_skills):203##    list_agents = []204##    list_skills = []205##    agents_to_skills = []206##207##    # computes first the list of agents using agent_grid208##    # list_agents = get_list_agents(agent_grid, nb_types)209##210##    # computes now the list of skills using population_grid211##    # list_skills = get_list_skills(population_grid)212##213##    # computes the mapping agents_to_skills214##...TFtofile.py
Source:TFtofile.py  
1##'''2##converts the input lists into a TF instance in a file (new version)3##'''4##def TFtofile(file_name, list_agents, list_skills, agents_to_skills, dim):5##    nb_agents = len(list_agents)6##    nb_skills = len(list_skills)7##    file_name = "TF-" + file_name + ".txt"8##    f = open(file_name, 'wt')9##    f.write('P ')10##    f.write('%s %s %s\n' % (int(nb_agents), int(nb_skills), int(dim)))11##    # print skills12##    for id_skill in range(nb_skills):13##        f.write('%s (%s %s) %s\n' % (int(id_skill), int(list_skills[id_skill][0][0]), int(list_skills[id_skill][0][1]), int(list_skills[id_skill][1])))14##    # print agents to costs and skills15##    for id_agent in range(nb_agents):16##        f.write('%s ' % int(id_agent))17##        f.write('(%s %s)' % (int(list_agents[id_agent][0][0]), int(list_agents[id_agent][0][1])))18##        # cost1 (deployment cost)19##        f.write(' %s' % int(list_agents[id_agent][2]))20##        # cost2 (repair cost)21##        f.write(' %s' % int(list_agents[id_agent][3]))22##        for id_skill in agents_to_skills[id_agent]:23##            f.write(' %s' % int(id_skill))24##        f.write('\n')25##    f.close()26'''27returns the number of different points on the map where an antenna can be deployed28'''29def get_nb_id_points(list_agents):30    length_list = len(list_agents)31    return list_agents[length_list - 1][1] + 132'''33returns the list of agents placed at the same point on the map, where the id of the point is id_point34'''35def get_list_agents_at_id_point(list_agents, id_point):36    nb_agents = len(list_agents)37    result = []38    for id_agent in range(nb_agents):39        if list_agents[id_agent][1] == id_point:40            result.append(list_agents[id_agent])41    return result42'''43converts the input lists into a TF instance in a file .tf (last version)44in addition, it creates another file .info which gives the correspondance45between each agent id and its coordinates on the map: each line is46a <id_agent> <cost1> <cost2> <range> <num_line> <num_col>47Recall that list_agents is an arry where each agent is a 6-vector: (point, id_point, cost1, cost2, range, id_agent)48'''49def TFtofile(file_name, list_agents, list_skills, agents_to_skills, dim):50    nb_agents = len(list_agents)51    nb_skills = len(list_skills)52    f = open(file_name + '.tf', 'wt')53    f_info = open(file_name + '.info', 'wt')54    f.write('p ')55    f.write('%s %s\n' % (int(nb_agents), int(nb_skills)))56    f_info.write(f'p {int(nb_agents)}\n')57    # agents58    for id_agent in range(nb_agents):59        f.write('a %s %s %s' % (int(id_agent), int(list_agents[id_agent][2]), int(list_agents[id_agent][3])))60        for id_skill in agents_to_skills[id_agent]:61            f.write(' %s' % int(id_skill))62        f.write('\n')63        f_info.write(f'a {int(id_agent)} {int(list_agents[id_agent][2])} {int(list_agents[id_agent][3])} {int(list_agents[id_agent][4])} {list_agents[id_agent][0][0]} {list_agents[id_agent][0][1]}\n')64    # skills65    f_info.close()66    for id_skill in range(nb_skills):67        f.write('s %s %s\n' % (int(id_skill), int(list_skills[id_skill][1])))68    # exclusion constraints69    #nb_id_points = get_nb_id_points(list_agents)70##    print('nb_id_points: ', nb_id_points)71    #if nb_id_points != nb_agents:72    #    for id_point in range(nb_id_points):73    #        list_agents_at_id_point = get_list_agents_at_id_point(list_agents, id_point)74    #        nb_agents_at_id_point = len(list_agents_at_id_point)75    #        if nb_agents_at_id_point > 1:76    #            f.write('e')77    #            for id_agent in range(nb_agents_at_id_point):78    #                f.write(' %s' % int(list_agents_at_id_point[id_agent][5]))79    #            f.write('\n')80    f.close()81'''82converts the input lists into a TF instance in a file (simple version)83'''84def TFtofilesimple(file_name, list_agents, list_skills, agents_to_skills, dim):85    nb_agents = len(list_agents)86    nb_skills = len(list_skills)87    file_name = file_name + ".tf.txt"88    f = open(file_name, 'wt')89    f.write('P ')90    f.write('%s %s\n' % (int(nb_agents), int(nb_skills)))91    for id_agent in range(nb_agents):92        f.write('%s' % int(list_agents[id_agent][2]))93        for id_skill in agents_to_skills[id_agent]:94            f.write(' %s' % int(id_skill))95        f.write('\n')96    f.close()97'''98file to save the points according to agent ids99The format is:100First line: <nb_agents> <dim_grid>101For each subsequent line: x_coord y_coord range102'''103def TFtomapagentidstopointsrange(file_name, list_agents, dim):104    nb_agents = len(list_agents)105    file_name = file_name + ".idtoinfo.txt"106    f = open(file_name, 'wt')107    f.write('%s %s\n' % (len(list_agents), dim))108    for id_agent in range(nb_agents):109        f.write('%s %s %s\n' % (int(list_agents[id_agent][0][0]), int(list_agents[id_agent][0][1]), int(list_agents[id_agent][4])))...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
