Best Python code snippet using localstack_python
test_clustering_simple.py
Source:test_clustering_simple.py  
1import numpy as np2import unittest3import covid19sim.frozen.clustering.simple as clu4import covid19sim.frozen.message_utils as mu5from tests.utils import FakeHuman, generate_received_messages, generate_random_messages, Visit6never = 9999  # dirty macro to indicate a human will never get infected7class SimpleClusteringTests(unittest.TestCase):8    # note: we only ever build & test clusters for a single human, assuming it would also work for others9    def test_same_day_visit_clusters(self):10        n_trials = 10011        for _ in range(n_trials):12            # scenario: single day visits, 1 cluster per visit, not enough visits to overlap the clusters13            visits = [14                Visit(visitor_real_uid=1, visited_real_uid=0, exposition=False, timestamp=2),15                Visit(visitor_real_uid=2, visited_real_uid=0, exposition=False, timestamp=2),16            ]17            # we will cheat and hard-code some initial 4-bit uids to make sure there is no overlap18            humans = [19                FakeHuman(20                    real_uid=idx,21                    exposition_timestamp=never,22                    visits_to_adopt=visits,23                    # with only two visitors, there should never be cluster uid overlap at day 224                    force_init_uid=np.uint8(idx),25                ) for idx in range(3)26            ]27            day2_uids = [h.rolling_uids[2] for h in humans]28            self.assertTrue(len(np.unique(day2_uids)) == 3)29            messages = generate_received_messages(humans)30            # now we need to get all messages sent to human 0 to do our actual clustering analysis31            h0_messages = messages[0]["received_messages"]32            self.assertEqual(len(h0_messages), 3)  # three timesteps in book33            self.assertEqual(sum([len(msgs) for msgs in h0_messages.values()]), 2)34            self.assertEqual(len(h0_messages[2]), 2)35            h0_messages = [msg for msgs in h0_messages.values() for msg in msgs]36            cluster_manager = clu.SimplisticClusterManager(max_history_ticks_offset=never)37            cluster_manager.add_messages(h0_messages)38            self.assertEqual(len(cluster_manager.clusters), 2)39            self.assertEqual(cluster_manager.latest_refresh_timestamp, 2)40            expositions = cluster_manager._get_expositions_array()41            self.assertEqual(len(expositions), 2)42            self.assertEqual(sum(expositions), 0)43            embeddings = cluster_manager.get_embeddings_array()44            self.assertTrue((embeddings[:, 1] == 0).all())  # risk level45            self.assertTrue((embeddings[:, 2] == 1).all())  # message count46            self.assertTrue((embeddings[:, 3] == 0).all())  # timestamp offset47    def test_same_day_visit_clusters_overlap(self):48        # scenario: single day visits, and some visits will share the same cluster49        visits = [50            Visit(visitor_real_uid=1, visited_real_uid=0, exposition=False, timestamp=0),51            Visit(visitor_real_uid=2, visited_real_uid=0, exposition=False, timestamp=0),52            Visit(visitor_real_uid=3, visited_real_uid=0, exposition=False, timestamp=0),53            Visit(visitor_real_uid=4, visited_real_uid=0, exposition=False, timestamp=0),54            Visit(visitor_real_uid=5, visited_real_uid=0, exposition=False, timestamp=0),55        ]56        # we will cheat and hard-code some initial 4-bit uids to make sure there is overlap57        humans = [58            FakeHuman(59                real_uid=idx,60                exposition_timestamp=never,61                visits_to_adopt=visits,62                force_init_uid=np.uint8(max(idx - 1, 0) % 3),63                force_init_risk=np.uint8(7),64            ) for idx in range(6)65        ]66        day0_visitor_uids = [h.rolling_uids[0] for h in humans[1:]]67        self.assertTrue(len(np.unique(day0_visitor_uids)) == 3)  # two visits will be overlapped68        messages = generate_received_messages(humans)69        h0_messages = messages[0]["received_messages"]70        self.assertEqual(len(h0_messages), 1)  # single timestep in book71        self.assertEqual(len(h0_messages[0]), 5)  # all 5 encounter messages in day 072        h0_messages = [msg for msgs in h0_messages.values() for msg in msgs]73        cluster_manager = clu.SimplisticClusterManager(max_history_ticks_offset=never)74        cluster_manager.add_messages(h0_messages)75        self.assertEqual(len(cluster_manager.clusters), 3)76        self.assertEqual(cluster_manager.latest_refresh_timestamp, 0)77        expositions = cluster_manager._get_expositions_array()78        self.assertTrue(len(expositions) == 3 and sum(expositions) == 0)79        embeddings = cluster_manager.get_embeddings_array()80        self.assertTrue((embeddings[:, 1] == 7).all())  # risk level81        self.assertTrue(np.logical_and(embeddings[:, 2] > 0, embeddings[:, 2] < 3).all())82        self.assertTrue((embeddings[:, 3] == 0).all())  # timestamp offset83    def test_cluster_risk_update(self):84        # scenario: single day visits, and some visits will share the same cluster85        visits = [86            Visit(visitor_real_uid=1, visited_real_uid=0, exposition=False, timestamp=0),87        ]88        humans = [89            FakeHuman(real_uid=0, exposition_timestamp=never, visits_to_adopt=visits,90                      force_init_risk=np.uint8(0)),91            FakeHuman(real_uid=1, exposition_timestamp=never, visits_to_adopt=visits,92                      force_init_risk=np.uint8(7)),93        ]94        messages = generate_received_messages(humans)95        h0_messages = messages[0]["received_messages"]96        self.assertEqual(len(h0_messages), 1)  # single timestep in book97        self.assertEqual(len(h0_messages[0]), 1)  # single encounter message in day 098        h0_messages = [msg for msgs in h0_messages.values() for msg in msgs]99        cluster_manager = clu.SimplisticClusterManager(max_history_ticks_offset=never)100        cluster_manager.add_messages(h0_messages)101        self.assertEqual(len(cluster_manager.clusters), 1)102        self.assertEqual(cluster_manager.clusters[0].risk_level, np.uint8(7))103        # we will add a manual update for one of the visits104        cluster_manager.add_messages([105            mu.create_update_message(h0_messages[0], np.uint8(9), np.uint64(1))106        ])107        self.assertEqual(len(cluster_manager.clusters), 1)108        # add a new encounter: it should not match the existing cluster due to diff risk109        cluster_manager.add_messages([110            mu.EncounterMessage(humans[1].rolling_uids[0], risk_level=np.uint8(1), encounter_time=0)111        ])112        self.assertEqual(len(cluster_manager.clusters), 2)113        self.assertEqual(cluster_manager.clusters[1].risk_level, np.uint8(1))114        # add a new encounter: it should match the existing cluster due to same risk115        new_encounter = \116            mu.EncounterMessage(humans[1].rolling_uids[0], risk_level=np.uint8(7), encounter_time=0)117        cluster_manager.add_messages([new_encounter])118        self.assertEqual(len(cluster_manager.clusters), 2)119        self.assertEqual(len(cluster_manager.clusters[0].messages), 2)120        # update one of the two encounters in the first cluster; average risk should change121        cluster_manager.add_messages([122            mu.create_update_message(new_encounter, np.uint8(13), np.uint64(1))123        ])124        self.assertEqual(len(cluster_manager.clusters), 2)125        self.assertEqual(len(cluster_manager.clusters[0].messages), 2)126    def test_cleanup_outdated_cluster(self):127        # scenario: a new encounter is added that is waaay outdated; it should not create a cluster128        visits = [129            Visit(visitor_real_uid=1, visited_real_uid=0, exposition=False, timestamp=2),130            Visit(visitor_real_uid=1, visited_real_uid=0, exposition=False, timestamp=5),131            Visit(visitor_real_uid=1, visited_real_uid=0, exposition=False, timestamp=8),132        ]133        humans = [134            FakeHuman(real_uid=0, exposition_timestamp=never, visits_to_adopt=visits),135            FakeHuman(real_uid=1, exposition_timestamp=never, visits_to_adopt=visits),136        ]137        messages = generate_received_messages(humans)138        h0_messages = messages[0]["received_messages"]139        self.assertEqual(len(h0_messages), 9)140        h0_messages = [msg for msgs in h0_messages.values() for msg in msgs]141        cluster_manager = clu.SimplisticClusterManager(max_history_ticks_offset=5)142        cluster_manager.add_messages(h0_messages)143        self.assertEqual(len(cluster_manager.clusters), 2)144        self.assertEqual(cluster_manager.clusters[0].first_update_time, np.uint8(5))145        self.assertEqual(cluster_manager.clusters[1].first_update_time, np.uint8(8))146        # new manually added encounters that are outdated should also be ignored147        cluster_manager.add_messages([148            mu.EncounterMessage(humans[1].rolling_uids[0], risk_level=np.uint8(1), encounter_time=0)149        ])150        self.assertEqual(len(cluster_manager.clusters), 2)151        self.assertEqual(cluster_manager.clusters[0].first_update_time, np.uint8(5))152        self.assertEqual(cluster_manager.clusters[1].first_update_time, np.uint8(8))153    def test_random_large_scale(self):154        n_trials = 10155        n_humans = 50156        n_visits = 2000157        n_expositions = 15158        max_timestamp = 10159        for _ in range(n_trials):160            h0_messages, visits = generate_random_messages(161                n_humans=n_humans,162                n_visits=n_visits,163                n_expositions=n_expositions,164                max_timestamp=max_timestamp,165            )166            cluster_manager = clu.SimplisticClusterManager(max_history_ticks_offset=never)167            cluster_manager.add_messages(h0_messages)168            self.assertLessEqual(169                len(cluster_manager.clusters),170                (mu.message_uid_mask + 1) * (mu.risk_level_mask + 1) * (max_timestamp + 1)171            )172            homogeneity_scores = cluster_manager._get_homogeneity_scores()173            for id in homogeneity_scores:174                self.assertLessEqual(homogeneity_scores[id], 1.0)175                min_homogeneity = 1 / sum([v.visited_real_uid == 0 for v in visits])176                self.assertLessEqual(min_homogeneity, homogeneity_scores[id])177if __name__ == "__main__":...test_clustering_blind.py
Source:test_clustering_blind.py  
1import numpy as np2import unittest3import covid19sim.frozen.clustering.blind as clu4import covid19sim.frozen.message_utils as mu5from tests.utils import FakeHuman, generate_received_messages, generate_random_messages, Visit6never = 9999  # dirty macro to indicate a human will never get infected7class BlindClusteringTests(unittest.TestCase):8    # note: we only ever build & test clusters for a single human, assuming it would also work for others9    def test_same_day_visit_clusters(self):10        n_trials = 10011        for _ in range(n_trials):12            # first scenario: single day visits, same risk for both visitors, should give 1 cluster13            visits = [14                Visit(visitor_real_uid=1, visited_real_uid=0, exposition=False, timestamp=2),15                Visit(visitor_real_uid=2, visited_real_uid=0, exposition=False, timestamp=2),16            ]17            humans = [FakeHuman(real_uid=idx, exposition_timestamp=never, visits_to_adopt=visits)18                      for idx in range(3)]19            messages = generate_received_messages(humans)20            h0_messages = [msg for msgs in messages[0]["received_messages"].values() for msg in msgs]21            cluster_manager = clu.BlindClusterManager(max_history_ticks_offset=never)22            cluster_manager.add_messages(h0_messages)23            self.assertEqual(len(cluster_manager.clusters), 1)24            self.assertEqual(cluster_manager.latest_refresh_timestamp, 2)25            embeddings = cluster_manager.get_embeddings_array()26            self.assertEqual(len(embeddings), 1)27            self.assertTrue((embeddings[:, 1] == 0).all())  # risk level28            self.assertTrue((embeddings[:, 2] == 2).all())  # message count29            self.assertTrue((embeddings[:, 3] == 0).all())  # timestamp offset30            # 2nd scenario: single day visits, diff risk for both visitors, should give 2 cluster31            humans = [FakeHuman(real_uid=idx, exposition_timestamp=never,32                                visits_to_adopt=visits, force_init_risk=mu.RiskLevelType(idx))33                      for idx in range(3)]34            messages = generate_received_messages(humans, minimum_risk_level_for_updates=1)35            h0_messages = [msg for msgs in messages[0]["received_messages"].values() for msg in msgs]36            cluster_manager = clu.BlindClusterManager(max_history_ticks_offset=never)37            cluster_manager.add_messages(h0_messages)38            self.assertEqual(len(cluster_manager.clusters), 2)39            self.assertEqual(cluster_manager.latest_refresh_timestamp, 2)40            expositions = cluster_manager._get_expositions_array()41            self.assertTrue(len(expositions) == 2 and sum(expositions) == 0)42            embeddings = cluster_manager.get_embeddings_array()43            self.assertEqual(len(embeddings), 2)44            self.assertTrue((embeddings[:, 1] > 0).all())  # risk level45            self.assertTrue((embeddings[:, 2] == 1).all())  # message count46            self.assertTrue((embeddings[:, 3] == 0).all())  # timestamp offset47    def test_cluster_risk_update(self):48        # scenario: single encounter that gets updated (should remain at 1 cluster)49        visits = [50            Visit(visitor_real_uid=1, visited_real_uid=0, exposition=False, timestamp=0),51        ]52        humans = [53            FakeHuman(real_uid=0, exposition_timestamp=never, visits_to_adopt=visits,54                      force_init_risk=np.uint8(0)),55            FakeHuman(real_uid=1, exposition_timestamp=never, visits_to_adopt=visits,56                      force_init_risk=np.uint8(7)),57        ]58        messages = generate_received_messages(humans)59        h0_messages = [msg for msgs in messages[0]["received_messages"].values() for msg in msgs]60        cluster_manager = clu.BlindClusterManager(max_history_ticks_offset=never)61        cluster_manager.add_messages(h0_messages)62        self.assertEqual(len(cluster_manager.clusters), 1)63        self.assertEqual(cluster_manager.clusters[0].risk_level, np.uint8(7))64        cluster_manager.add_messages([65            mu.create_update_message(h0_messages[0], np.uint8(9), np.uint64(1))66        ])67        self.assertEqual(len(cluster_manager.clusters), 1)68        self.assertEqual(cluster_manager.clusters[0].risk_level, np.uint8(9))69        # add a new encounter: it should not match the existing cluster due to diff risk70        cluster_manager.add_messages([71            mu.EncounterMessage(mu.create_new_uid(), risk_level=np.uint8(1), encounter_time=0)72        ])73        self.assertEqual(len(cluster_manager.clusters), 2)74        self.assertEqual(cluster_manager.clusters[1].risk_level, np.uint8(1))75        # add a new encounter: it should match the existing cluster due to same risk76        new_encounter = \77            mu.EncounterMessage(mu.create_new_uid(), risk_level=np.uint8(9), encounter_time=0)78        cluster_manager.add_messages([new_encounter])79        self.assertEqual(len(cluster_manager.clusters), 2)80        self.assertEqual(cluster_manager.clusters[0].risk_level, np.uint8(9))81        self.assertEqual(len(cluster_manager.clusters[0].messages), 2)82        # update one of the two encounters in the first cluster; it should get split83        cluster_manager.add_messages([84            mu.create_update_message(new_encounter, np.uint8(13), np.uint64(1))85        ])86        self.assertEqual(len(cluster_manager.clusters), 3)87        self.assertEqual(cluster_manager.clusters[0].risk_level, np.uint8(9))88        self.assertEqual(cluster_manager.clusters[1].risk_level, np.uint8(13))89        self.assertEqual(cluster_manager.clusters[2].risk_level, np.uint8(1))90        self.assertTrue(all([len(c.messages) == 1 for c in cluster_manager.clusters]))91    def test_cleanup_outdated_cluster(self):92        # scenario: a new encounter is added that is waaay outdated; it should not create a cluster93        visits = [94            Visit(visitor_real_uid=1, visited_real_uid=0, exposition=False, timestamp=2),95            Visit(visitor_real_uid=1, visited_real_uid=0, exposition=False, timestamp=5),96            Visit(visitor_real_uid=1, visited_real_uid=0, exposition=False, timestamp=8),97        ]98        humans = [99            FakeHuman(real_uid=0, exposition_timestamp=never, visits_to_adopt=visits),100            FakeHuman(real_uid=1, exposition_timestamp=never, visits_to_adopt=visits),101        ]102        messages = generate_received_messages(humans)103        h0_messages = [msg for msgs in messages[0]["received_messages"].values() for msg in msgs]104        cluster_manager = clu.BlindClusterManager(max_history_ticks_offset=5)105        cluster_manager.add_messages(h0_messages)106        self.assertEqual(len(cluster_manager.clusters), 2)107        self.assertEqual(cluster_manager.clusters[0].first_update_time, np.uint8(5))108        self.assertEqual(cluster_manager.clusters[1].first_update_time, np.uint8(8))109        # new manually added encounters that are outdated should also be ignored110        cluster_manager.add_messages([111            mu.EncounterMessage(mu.create_new_uid(), risk_level=np.uint8(1), encounter_time=0)112        ])113        self.assertEqual(len(cluster_manager.clusters), 2)114        self.assertEqual(cluster_manager.clusters[0].first_update_time, np.uint8(5))115        self.assertEqual(cluster_manager.clusters[1].first_update_time, np.uint8(8))116    def test_random_large_scale(self):117        n_trials = 25118        n_humans = 50119        n_visits = 2000120        n_expositions = 15121        max_timestamp = 10122        for _ in range(n_trials):123            h0_messages, visits = generate_random_messages(124                n_humans=n_humans,125                n_visits=n_visits,126                n_expositions=n_expositions,127                max_timestamp=max_timestamp,128            )129            cluster_manager = clu.BlindClusterManager(max_history_ticks_offset=never)130            cluster_manager.add_messages(h0_messages)131            self.assertLessEqual(132                len(cluster_manager.clusters),133                (mu.message_uid_mask + 1) * (mu.risk_level_mask + 1) * (max_timestamp + 1)134            )135            homogeneity_scores = cluster_manager._get_homogeneity_scores()136            for id in homogeneity_scores:137                self.assertLessEqual(homogeneity_scores[id], 1.0)138                min_homogeneity = 1 / sum([v.visited_real_uid == 0 for v in visits])139                self.assertLessEqual(min_homogeneity, homogeneity_scores[id])140if __name__ == "__main__":...__main__.py
Source:__main__.py  
1from lib.KubeManage import KubernetesManagment2from lib.util import *3from docs.__docs__ import *4import subprocess5################################################################################6##############				   Master Values				 #################7################################################################################8sys.path.insert(0, os.path.abspath('.'))9#Before we load the menu, we need to do some checks10# The .env needs to be reloaded in the case of other alterations11#12# Where the terminal is located when you run the file13PWD = os.path.realpath(".")14#PWD_LIST = os.listdir(PWD)15# ohh look a global list16global PROJECT_ROOT17try:18    PROJECT_ROOT = Path(os.path.dirname(__file__))19except:20    PROJECT_ROOT = Path(os.path.dirname(PWD))21debuggreen(f"PROJECT_ROOT: {PROJECT_ROOT}")22os.environ["PROJECT_ROOT"] = str(PROJECT_ROOT.absolute())23global BIN_ROOT24BIN_ROOT=Path(PROJECT_ROOT,'bin')25os.environ["BIN_ROOT"] = str(BIN_ROOT.absolute())26debuggreen(f"BIN_ROOT: {BIN_ROOT}")27global LIB_ROOT28LIB_ROOT = Path(PROJECT_ROOT,'lib')29os.environ["LIB_ROOT"] = str(LIB_ROOT.absolute())30debuggreen(f"LIB_ROOT: {LIB_ROOT}")31global YAML_REPO32YAML_REPO = Path(LIB_ROOT,"yaml_repo")33debuggreen(f"YAML_REPO: {YAML_REPO}")34global KUBECONFIGPATH35KUBECONFIGPATH = Path(YAML_REPO,'kubeconfig.yml')36os.environ["KUBECONFIGPATH"] = str(KUBECONFIGPATH.absolute())37debuggreen(f"KUBECONFIGPATH: {KUBECONFIGPATH}")38CLUSTER_ROLE_CONFIG =         Path(YAML_REPO,"cluster_role_config.yml")39CLUSTER_ROLE_CONFIG_BINDING = Path(YAML_REPO,"cluster_role_config_binding.yml")40CLUSTER_DASHBOARD =           Path(YAML_REPO,"kubernetes_dashboard.yml")41#debuggreen(f"CLUSTER_ROLE_CONFIG :{CLUSTER_ROLE_CONFIG}")42#debuggreen(f"CLUSTER_ROLE_CONFIG :{CLUSTER_ROLE_CONFIG}")43list_of_critical_vars = ["PROJECT_ROOT","LIB_ROOT","BIN_ROOT","KUBECONFIGPATH","CLUSTER_ROLE_CONFIG","CLUSTER_ROLE_CONFIG_BINDING"]44###############################################################################45##					 Docker Information									##46###############################################################################47class KindWrapper():48    def __init__(self,kind_bin_path:Path):49        self.binary_path = str(kind_bin_path.absolute())50    51    def create_cluster(self,namespace:str,kubeconfig_path:Path):52        """53        creates a cluster with given config54        """55    56def showenv():57    '''58    Checks shell env vars for required data59    '''60    for thing in os.getenv():61        if thing in list_of_critical_vars:62            greenprint(f"[+] {thing}")63def setpath():64    '''65    Establishes paths for tooling (kind, docker, etc...)66    '''67    sys.path.insert(0,str(BIN_ROOT))68def create_certificate_authority():69    """70    Creates a certificate authority using a shell script71    """72    greenprint("[+] Generating CA data using script 'init_ca.sh'")73    subprocess.Popen(f"sh -c {PROJECT_ROOT}/init_ca.sh")74# create your own custom scripts with ease!75def establish_host(cluster_reference:KubernetesManagment):76    """77    Performs setup of a cluster, use this before creating a cluster78    """79    #setpath()80    #cluster_reference.create_service_account("cluster_admin")81    #cluster_reference82    83def init_manager() -> KubernetesManagment:84    """85    Creates an instance of KubernetesManagment class86    """87    ClusterManager = KubernetesManagment(kubeconfig=KUBECONFIGPATH,88                                         PROJECT_ROOT=PROJECT_ROOT,89                                         LIB_ROOT=LIB_ROOT,90                                         BIN_ROOT=BIN_ROOT,91                                         YAML_REPO=YAML_REPO,92                                         CLUSTER_ROLE_CONFIG=CLUSTER_ROLE_CONFIG,93                                         CLUSTER_ROLE_CONFIG_BINDING=CLUSTER_ROLE_CONFIG_BINDING94                                         )95    return ClusterManager96def init_dashboard(cluster_manager:KubernetesManagment):97    '''98    Deploys the kubernetes dashboard at99    http://localhost:8001/api/v1/namespaces/kubernetes-dashboard/services/https:kubernetes-dashboard:/proxy/100    '''101    try:102        #apply dashboard config103        clustermanager.kubectl_apply_config(CLUSTER_DASHBOARD)104        cluster_manager.make_thefucking_dashboard_fucking_work()105        # init proxy service106        cluster_manager.cluster.kubectl("proxy")107        greenprint("[+] Dashboard is running on http://localhost:8001/api/v1/namespaces/kubernetes-dashboard/services/https:kubernetes-dashboard:/proxy/")108    except:109        errorlogger("Cannot deploy dashboard, check the log file")110def createsandbox(cluster_manager:KubernetesManagment):111    '''112	Creates the sandbox, using kubernetes/docker113	'''114	# set environment to read from kube config directory115    #setenv({"KUBECONFIG":KUBECONFIGPATH})116	#TODO: make this117    cluster_manager.init_client()118    cluster_manager.create_cluster_role()119    cluster_manager.create_cluster_role_binding()120    cluster_manager.create_cluster()121def runsandbox(composefile):122	'''123	run a sandbox124	Args:125		composefile (str): composefile to use126	'''127	#subprocess.Popen(["docker-compose", "up", composefile])128if __name__ == "__main__":129    clustermanager = init_manager()130    init_dashboard(cluster_manager=clustermanager)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
