Best Python code snippet using lisa_python
AsyncNeoDriver.py
Source:AsyncNeoDriver.py  
...48        self.entity_id_cache = {}49        self.entity_name_cache = {}50        self.relation_cache = {}51        self.all_entity_embeddings = {}52    async def execute_async(self, query):53        payload = {'statements': [{'statement': query}]}54        try_time = 055        while try_time < 3:56            try_time += 157            try:58                async with self.session.post(self.query_url, json=payload) as response:59                    return json.loads(await response.text())['results'][0]60            except:61                await asyncio.sleep(0.1)62        return None63    def execute(self, query):64        '''65        the query running on neo4j server's command line66        '''67        return asyncio.run_coroutine_threadsafe(self.execute_async(query), self.loop)68    @staticmethod69    def process_result(result):70        try:71            res = []72            for a in result['data']:73                a['row'][0]['neoId'] = str(a['row'][1])74                res.append(a['row'][0])75            assert len(result['data']) == len(res)76            return res77        except:78            # print('error at %s val %s\n result : %s'%(property,value,result))79            return None80    def get_entities_by_matching_property(self, property, value):81        return asyncio.run_coroutine_threadsafe(82            self.get_entities_by_matching_property_async(property, value), self.loop)83    async def get_entities_by_matching_property_async(self, property, value):84        result = await self.execute_async('match (n:%s) where n.%s contains "%s" return n,id(n)' %85                                          (self.entity_label, property, value))86        return self.process_result(result)87    def get_entities_by_property(self, property, value):88        return asyncio.run_coroutine_threadsafe(89            self.get_entities_by_property_async(property, value), self.loop)90    async def get_entities_by_property_async(self, property, value):91        result = await self.execute_async('match (n) where n.%s="%s" return n,id(n)' %92                                          (property, value))93        return self.process_result(result)94    async def get_all_entities_async(self, entity_label, filter_labels):95        processed_filters = ''96        for filter_label in filter_labels:97            processed_filters += 'not n.ç±»å« contains \'%s\' and ' % (98                filter_label)99        if processed_filters != '':100            # å å»æåä¸ä¸ªand å¹¶å¨åé¢å ä¸whereè¯å¥101            processed_filters = processed_filters[:-5]102            processed_filters = 'where ' + processed_filters103        result = await self.execute_async('match (n:%s) %s return n, id(n)' %104                                          (entity_label, processed_filters))105        return self.process_result(result)106    def get_all_entities(self, entity_label, filter_label=[]):107        return asyncio.run_coroutine_threadsafe(108            self.get_all_entities_async(entity_label, filter_label), self.loop)109    async def get_entities_by_name_async(self, name):110        '''111            return : list of entities112        '''113        if self.fuzzy_index:114            '''cur_name_embedding = self.bc.encode([name])115            res = []116            for key in self.all_entity_embeddings.keys():117                res.append((key, cal_word_simi(self.all_entity_embeddings[key], cur_name_embedding)))118            res = sorted(res, key=lambda x: x[1], reversed=True)'''119            print("CALL db.index.fulltext.queryNodes('%s','%s') yield node,score return node,id(node),score" % (120                self.fuzzy_index, name))121            result = await self.execute_async("CALL db.index.fulltext.queryNodes('%s','%s') yield node,score return node,id(node),score" % (self.fuzzy_index, name))122            try:123                res = []124                thresh = 1.5125                max_score = 0126                for a in result['data']:127                    name = a['row'][0]['name']128                    if 'é»è±æºåº' in name or name == 'æºåº':129                        continue130                    a['row'][0]['neoId'] = str(a['row'][1])131                    score = a['row'][2]132                    max_score = max(max_score, score)133                    a['row'][0]['score'] = score134                    if score > thresh:135                        res.append(a['row'][0])136                res = list(filter(lambda x: x['score'] == max_score, res))137                return res138            except:139                return None140        else:141            if name in self.entity_name_cache:  # and name != 'æºåº' and 'é»è±æºåº' not in name:142                return self.entity_name_cache[name]143            else:144                r = await self.get_entities_by_property_async('name', name)145                r = list(filter(lambda x: x['name']146                                != 'é»è±æºåº' and x['name'] != 'æºåº', r))147                self.entity_name_cache[name] = r148                for entity in r:149                    neoId = entity['neoId']150                    self.entity_id_cache[neoId] = [entity]151                return r152    def get_entities_by_name(self, name):153        '''154            return : list of entities155        '''156        return asyncio.run_coroutine_threadsafe(157            self.get_entities_by_name_async(name), self.loop)158    async def get_entity_by_id_async(self, neoId):159        '''160            return : list of entities161        '''162        if neoId in self.entity_id_cache:163            return self.entity_id_cache[neoId]164        else:165            r = await self.execute_async('match (n:%s) where id(n)=%s return n,id(n)' % (self.entity_label, neoId))166            r = self.process_result(r)167            self.entity_id_cache[neoId] = r168            return r169    def get_entity_by_id(self, neoId):170        '''171            return : list of entities172        '''173        return asyncio.run_coroutine_threadsafe(174            self.get_entity_by_id_async(neoId), self.loop)175    async def get_entity_information_by_label_async(self, label):176        result = await self.execute_async('match (n:%s) return id(n), n.name, properties(n)' % (label))177        res = list(map(lambda x: x['row'], result['data']))178        return res179    def get_entity_information_by_label(self, label):180        return asyncio.run_coroutine_threadsafe(181            self.get_entity_information_by_label_async(label), self.loop182        )183    def exist_name(self, name):184        async def _exist_name(name):185            r = await self.get_entities_by_name_async(name)186            return r is not None and r != []187        return asyncio.run_coroutine_threadsafe(_exist_name(name), self.loop)188    async def get_genres_by_relation_async(self, f_genre, c_genre, f_name, reverse=False,relation=None):189        '''190            return : list of triples (relation, entity name, entity neoId)191        '''192        arrow1 = ''193        arrow2 = ''194        if reverse:195            arrow1 = '<'196        else:197            arrow2 = '>'198        if relation:199            r_name = ':'+relation200        else:201            r_name = ''202        neo4j = 'match (n:%s)%s-[r%s]-%s(m:%s) where n.name =\'%s\' return distinct m, id(m)'% (f_genre, arrow1, r_name, arrow2, c_genre, f_name)203        res = await self.execute_async(neo4j)204        # res = [x['name'] for x in result]205        # res = list(map(lambda x: tuple(x['row']), result['data']))206        # self.relation_cache[query_tuple] = res207        return self.process_result(res)208    def get_genres_by_relation(self, f_genre, c_genre, f_name, reverse=False,relation=None):209        '''210            return : list of triples (relation, entity name, entity neoId)211        '''212        return asyncio.run_coroutine_threadsafe(213            self.get_genres_by_relation_async(f_genre, c_genre, f_name, reverse,relation), self.loop)214    async def get_relations_by_id_async(self, id, l_label, r_label, direction=''):215        '''216            return : list of triples (relation, entity name, entity neoId)217        '''218        query_tuple = (id, direction, l_label, r_label)219        if query_tuple in self.relation_cache:220            return self.relation_cache[query_tuple]221        else:222            if l_label != '':223                l_label = ':' + l_label224            if r_label != '':225                r_label = ':' + r_label226            arrow1 = ''227            arrow2 = ''228            if direction == '<':229                arrow1 = '<'230            elif direction == '>':231                arrow2 = '>'232            result = await self.execute_async('match (n%s)%s-[r]-%s(m%s) where id(n)=%s return type(r), m.name, id(m)'233                                              % (l_label, arrow1, arrow2, r_label, id))234            res = list(map(lambda x: list(x['row']), result['data']))235            self.relation_cache[query_tuple] = res236            return res237    def get_relations_by_id(self, id, l_label, r_label, direction=''):238        '''239            return : list of triples (relation, entity name, entity neoId)240        '''241        return asyncio.run_coroutine_threadsafe(242            self.get_relations_by_id_async(id, l_label, r_label, direction), self.loop)243    async def get_labels_by_name_async(self, name, type = "Instance"):244        sql = 'match (n:%s) where n.åç§°="%s"  or n.åç§°=~".*%s.*" return labels(n), id(n)' % (type, name, name)245        result = await self.execute_async(sql)246        res = list(map(lambda x: x['row'], result['data']))247        return res248    def get_labels_by_name(self, name):249        return asyncio.run_coroutine_threadsafe(250            self.get_labels_by_name_async(name), self.loop)251    async def get_label_by_id_async(self, id):252        result = await self.execute_async('match (n) where id(n) = %d return labels(n)' % (id))253        res = list(map(lambda x: x['row'][0][0], result['data']))254        return res255    def get_label_by_id(self, id):256        return asyncio.run_coroutine_threadsafe(257            self.get_label_by_id_async(id), self.loop)258    async def get_name_by_id_async(self, id):259        result = await self.execute_async('match (n) where id(n) = %d return n.name' % (id))260        res = list(map(lambda x: x['row'][0], result['data']))261        return res262    def get_name_by_id(self, id):263        return asyncio.run_coroutine_threadsafe(264            self.get_name_by_id_async(id), self.loop)265    async def get_props_in_entity_async(self, name, label):266        result = await self.execute_async('match (n:%s) where n.name = \'%s\' return properties(n)' % (label, name))267        res = list(map(lambda x: x['row'][0], result['data']))[0]268        return res269    def get_props_in_entity(self, name, label):270        res = asyncio.run_coroutine_threadsafe(271            self.get_props_in_entity_async(name, label), self.loop)272        return res273    async def get_props_by_id_async(self, neoId, label):274        result = await self.execute_async('match (n:%s) where id(n) = %d return properties(n)' % (label, neoId))275        res = list(map(lambda x: x['row'][0], result['data']))[0]276        return res277    def get_props_by_id(self, neoId, label):278        res = asyncio.run_coroutine_threadsafe(279            self.get_props_by_id_async(neoId, label), self.loop)280        return res281    # DPQA282    async def get_props_by_id_dpqa_async(self, neoId):283        result = await self.execute_async('match (n) where id(n) = %d return properties(n)' % (neoId))284        res = list(map(lambda x: x['row'][0], result['data']))[0]285        return res286    def get_props_by_id_dpqa(self, neoId):287        res = asyncio.run_coroutine_threadsafe(288            self.get_props_by_id_dpqa_async(neoId), self.loop)289        return res290    # Merchant291    async def get_entities_by_genre_async(self, genre):292        '''293            return : list of entities294        '''295        result = await self.execute_async('match (n:%s) return n,id(n)' % (genre))296        res = self.process_result(result)297        return res298    def get_entities_by_genre(self, genre):299        '''300            return : list of entities301        '''302        return asyncio.run_coroutine_threadsafe(303            self.get_entities_by_genre_async(genre), self.loop)304    async def get_entities_by_genre_and_name_async(self, genre, name):305        '''306            return : list of entities307        '''308        result = await self.execute_async('match (n:%s) where n.name = \'%s\' return n,id(n), properties(n)' % (genre, name))309        res = list(map(lambda x: x['row'], result['data']))310        return res311    def get_entities_by_genre_and_name(self, genre, name):312        '''313            return : list of entities314        '''315        return asyncio.run_coroutine_threadsafe(316            self.get_entities_by_genre_and_name_async(genre, name), self.loop)317    async def get_merchandise_entities_by_genre_async(self, genre):318        '''319            return : list of entities320        '''321        result = await self.execute_async('match (n:%s) where exists(n.æå¡å
容) return properties(n)' % (genre))322        res = list(map(lambda x: x['row'], result['data']))323        return res324    def get_merchandise_entities_by_genre(self, genre):325        '''326            return : list of entities327        '''328        return asyncio.run_coroutine_threadsafe(329            self.get_merchandise_entities_by_genre_async(genre), self.loop)330    @lru_cache(maxsize=256)331    def get_instance_of_genre(self, genre_name, genre='SubGenre'):332        # neo4j_sql = 'match (n:Instance)-[:å±äº]->(m:%s {name:"%s"}) return distinct n,id(n)' % (genre, genre_name)333        neo4j_sql = 'match (n:Instance)-[:å±äº]->(m {name:"%s"}) return distinct n,id(n)' % ( genre_name)  # zsh èªç©ºçé®ççç±»å为SubGenre|SubGenre_childï¼æä»¥ç´æ¥æ¾genre_nameä¸çèç¹ï¼ä¸ç»m设置类åäº334        result = self.execute(neo4j_sql).result()335        result = self.process_result(result)...indexFind.py
Source:indexFind.py  
...14        followers_index = "CREATE INDEX IF NOT EXISTS followers_count ON tweets (user_followers_count);"15        friends_index = "CREATE INDEX IF NOT EXISTS friends_count ON tweets (user_friends_count);"16        # Statement.ConsistencyLevel = ConsistencyLevel.ALL  # more than one response ) to ensure no data is missing from querying replicas17        # session.execute(lang_index)18        session.execute_async(date_index) 19        session.execute_async(location_index)20        session.execute_async(followers_index)21        session.execute_async(friends_index)22        find_withIndex1 = "SELECT tweet_id from tweets WHERE lang = 'en' and token(tweet_id) >= -9223372036854775808 ALLOW FILTERING;"23        find_withIndex2 = "SELECT user_location from tweets WHERE user_location= 'London' and token(tweet_id) >= -9223372036854775808 ALLOW FILTERING;"24        find_withIndex3 = "SELECT user_followers_count from tweets WHERE user_followers_count >= 1000 and token(tweet_id) >= -9223372036854775808 ALLOW FILTERING;"25        find_withIndex4 = "SELECT user_friends_count from tweets WHERE user_friends_count >= 1000 and token(tweet_id) >= -9223372036854775808 ALLOW FILTERING;"26        # find_withIndex = "SELECT count(tweet_id) WHERE lang = 'en' AND user.location = 'London' \27        #               AND user.followers_count>=1000 AND user.friends_count>=1000 and created_at < toTimestamp(now()) ALLOW FILTERING;"  # LIMIT 1000000000"28        iter_durations = []29        while iterations > 0:30            # session.execute("ALTER TABLE tweets WITH GC_GRACE_SECONDS = 0;")  # To clear tombstone buffer for single node - so that the query can be able to get results without exceeding limitations.31            rows = 032            duration = 0.033            print("\nIteration: ",iterations)34            try:35                # lang = en and friends_count > 100 and followers > 100 36                start1 = time.time()37                result_1 = session.execute_async (find_withIndex1)38                r1 = len(list(result_1.result()))39                duration = time.time() - start140                start2 = time.time()41                result_1 = session.execute_async (find_withIndex2)42                rows_2 = len(list(result_2.result()))43                duration = time.time() - start244                start3 = time.time()45                result_1 = session.execute_async (find_withIndex3)46                rows_3 = len(list(result_3.result()))47                duration = time.time() - start348                start4 = time.time()49                result_4 = session.execute_async (find_withIndex4)50                rows_4 = len(list(result_4.result()))51                duration += time.time() - start452                # rows = len(rows_returned1.current_rows)+len(rows_returned2.current_rows)+len(rows_returned3.current_rows)+len(rows_returned4.current_rows)53                rows = rows_1 + rows_2 + rows_3 + rows_454            except Exception as error:55                print("\nSELECT stm error : " + error.__str__())56            print ("  Rows returned: ", rows)57            print ("  Elapsed time : ", duration)58            iter_durations.append( duration )59            iterations -= 160        avg = sum(iter_durations) / float( max(len(iter_durations), 1) )61        with open("results/iterations.json","a+") as bulk_f:62            json.dump({"dataset":jsn_dataset, "findByIndex": iter_durations,"average":avg}, bulk_f)63            bulk_f.write("\n")64        print("%i iterations: %s " % (iterations, str(iter_durations)) )65    except Exception as error:66        print("\nFind by Index error : \n" + error.__str__())67    try:68        # session.execute("DROP INDEX IF EXISTS lang_index;")69        session.execute_async("DROP INDEX IF EXISTS date_index;")70        session.execute_async("DROP INDEX IF EXISTS location_index;")71        session.execute_async("DROP INDEX IF EXISTS followers_count;")72        session.execute_async("DROP INDEX IF EXISTS friends_count;")73    except Exception as error:74        print("\nDROP INDEX stm error : " + error.__str__())75    # print( "  %-22s %-34s %-20s %-5s %-5s" % ("tweet_id", "created_at", "user.screen_name", "lang", "text") )...test_periodic.py
Source:test_periodic.py  
...33        with Timer() as t:34            while self.task.counter <= 0 and t.elapsed < 3.0:35                time.sleep(0.101)36        self.assertGreater(self.task.counter, 0)37    def test_execute_async(self):38        f = self.task.execute_async()39        res = f.result(3.0)40        self.assertNotNone(res)41        self.assertGreater(res, 0)42        # Verify exception path43        self.task.fail_async = True44        with self.assertRaises(Exception) as ctx:45            # Call this twice, since there's a race condition where setting46            # fail_async and getting the future from execute_async is called47            # when execute is between the self.fail_async check and the return48            self.task.execute_async().result(1.0)49            self.task.execute_async().result(1.0)50        self.assertEqual(ctx.exception.args[0], "fail_async")51        # Wait until task shuts down52        with Timer() as t:53            while t.elapsed < 5.0:54                if not self.task.running:55                    break56            self.assertFalse(self.task.running)57        # Verify early exception on async_execute against shutdown tasks58        f = self.task.execute_async()59        with self.assertRaises(Exception) as ctx:60            f.result()61        self.assertEqual(ctx.exception.args[0], "Worker not running")62    def test_trylater_after(self):63        t = self.task64        # Set up some mocks65        t._handle_try_later = self.mock.Mock(wraps=t._handle_try_later)66        t.stop_event = self.mock.Mock(wraps=t.stop_event)67        # Run once with no TryLater68        t.execute_async().result()69        self.assertFalse(t._handle_try_later.called)70        # Run once with no TryLater71        t.trylater = 0.0172        # Wait for it to get called once73        t0 = time.time()74        while not t._handle_try_later.called and time.time() - t0 < 5.0:75            time.sleep(0.01)76        self.assertTrue(t._handle_try_later)77        # Disable trylater and wait for one successful completion78        t.trylater = None79        t.execute_async().result(5.0)80        # Make sure things were called with good values81        self.assertTrue(t._handle_try_later)82        self.task.stop_event.wait.assert_any_call(0.01)83class MyMultiTask(MyTask):84    workers = 585class TestMultiTask(SingleTaskTestCase):86    TASK = MyMultiTask87    def test_multi_execute(self):88        with Timer() as t:89            while len(self.task.visit_threads) < 5 and t.elapsed < 3.0:90                time.sleep(0.101)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
