How to use list_domain_names method in localstack

Best Python code snippet using localstack_python

get_cocitation_network.py

Source:get_cocitation_network.py Github

copy

Full Screen

1import ast2import pandas as pd3pd.options.mode.chained_assignment = None # default='warn'4import prince5import plotly.express as px6import numpy as np7import time8import networkx as nx9from matplotlib import pyplot as plt10from pandas.api.types import CategoricalDtype11from ural import get_domain_name12from utils import (import_data,13 save_data,14 save_figure,15 save_numpy_array,16 save_list)17from create_twitter_users_lists import get_lists_and_followers18from sklearn.preprocessing import StandardScaler19from sklearn.decomposition import PCA20def add_type(var1, var2, df):21 list_scientists, list_activists, list_delayers, df_followers = get_lists_and_followers()22 df[var1] = ''23 df[var1] = np.where(df[var2].isin(list_scientists), 'scientist', df[var1])24 df[var1] = np.where(df[var2].isin(list_activists), 'activist', df[var1])25 df[var1] = np.where(df[var2].isin(list_delayers), 'delayer', df[var1])26 return df27def intersection(lst1, lst2):28 lst3 = [value for value in lst1 if value in lst2]29 return lst330def get_tweets():31 df = import_data ('twitter_data_climate_tweets_2022_03_15.csv')32 df = df[~df['query'].isin(['f'])]33 df = add_type('type', 'username', df)34 df['hashtags'] = df['hashtags'].str.lower()35 print('number of tweets', len(df))36 return df37def aggregate_domains_per_user():38 df = get_tweets()39 for index, row in df.iterrows():40 df.at[index, 'domain_name']=ast.literal_eval(row['domain_name'])41 u = df.groupby(['username', 'type'])['domain_name'].apply(list).reset_index(name='list_domain_names')42 #u['list_domain_names'] = u['list_domain_names'].apply(lambda list_items: list({x for l in list_items for x in l}))43 u['list_domain_names'] = u['list_domain_names'].apply(lambda list_items: list(x for l in list_items for x in l))44 list_platforms = ['twitter.com', 'youtube.com', 'bit.ly', ]45 u['list_domain_names'] = u['list_domain_names'].apply(lambda list_items: list(x for x in list_items if x not in list_platforms))46 u['len_list'] = u['list_domain_names'].apply(len)47 u = u.sort_values(by = 'type')48 print(u.info())49 print(u['len_list'].describe())50 return u51def get_cocitation(limit_cocitations):52 timestr = time.strftime('%Y_%m_%d')53 df = aggregate_domains_per_user()54 list_individuals = df['username'].tolist()55 save_list(list_individuals, 'list_individuals_cocitations.txt')56 print(list_individuals[0:10])57 n = len(list_individuals)58 print('number of individuals', n)59 matrix = np.zeros((n,n))60 matrix_lim = np.zeros((n,n))61 for user_i in list_individuals:62 print(user_i)63 for user_j in list_individuals:64 if user_i != user_j:65 i = df.index[df['username'] == user_i]66 j = df.index[df['username'] == user_j]67 a = intersection(df['list_domain_names'].iloc[i[0]], df['list_domain_names'].iloc[j[0]])68 matrix[i,j] = len(a)69 matrix[i,i] = len(df['list_domain_names'].iloc[i[0]])70 matrix_lim[i,i] = 071 if matrix[i,j] > limit_cocitations:72 matrix_lim[i,j] = 173 else:74 matrix_lim[i,j] = 075 save_numpy_array(matrix, 'cocitations_{}.npy'.format(timestr))76 #print(matrix)77 #print(matrix_lim[1,:])78 s = np.sum(matrix_lim, axis=1)79 G = nx.from_numpy_matrix(matrix_lim)80 print(G.nodes)81 for index, row in df.iterrows():82 G.nodes[index]['type'] = row['type']83 G.nodes[index]['username'] = row['username']84 nx.write_gexf(G, './data/{}_network_climate_cocitations_{}.gexf'.format(limit_cocitations, timestr))85 #print(G.nodes[0]['type'])86 n_zeros = np.count_nonzero(s==0)87 print(s)88 print(len(s))89 print(n_zeros)90 return matrix91def to_1D(series):92 return pd.Series([x for _list in series for x in _list], name = 'hashtag_count')93def get_hashtags(limit_occurence):94 df = get_tweets()95 df['hashtags'] = df['hashtags'].apply(eval)96 df['len_hashtags']= df['hashtags'].apply(len)97 series = to_1D(df['hashtags']).value_counts()98 df1 = series.to_frame()99 print(df1['hashtag_count'].describe())100 df1.index.name='hashtags'101 #df1['hashtags'] = df1['hashtags'].str.lower()102 df1 = df1.reset_index(level=0)103 df1 = df1[df1['hashtag_count']> limit_occurence]104 print(df1['hashtags'].head(20))105 return df1106def get_hashtags_by_type() :107 df = get_tweets()108 df = df[['username', 'hashtags', 'type_of_tweet', 'id', 'text', 'followers_count', 'type']]109 a = len(df[df['type'].isin(['activist'])])110 b = len(df[df['type'].isin(['delayer'])])111 c = len(df[df['type'].isin(['scientist'])])112 #df = df[~df['type_of_tweet'].isin(['replied_to'])]113 for index, row in df.iterrows():114 df.at[index, 'hashtags']=ast.literal_eval(row['hashtags'])115 df['nb_hashtags'] = df['hashtags'].apply(len)116 print(df['nb_hashtags'].head(20))117 print('number of tw with hashtags', len(df[df['nb_hashtags']>0]))118 df = df.explode('hashtags')119 df = df.dropna(subset=['hashtags'])120 print(df.head(40))121 print('There are', df['hashtags'].nunique(), 'unique hastag')122 print(df.groupby(['type'], as_index = False).size())123 df1 = df[df['nb_hashtags']>0].groupby(['type'], as_index = False).size()124 df1['share_tw_hashtags'] = 0125 df1['share_tw_hashtags'].iloc[0] = df1['size'].iloc[0]/a126 df1['share_tw_hashtags'].iloc[1] = df1['size'].iloc[1]/b127 df1['share_tw_hashtags'].iloc[2] = df1['size'].iloc[2]/c128 print(df1)129 print(df[df['nb_hashtags']>0].groupby(['type'], as_index = False).size())130 return df131if __name__ == '__main__':132 #get_cocitation(limit_cocitations = 30)133 get_hashtags(limit_occurence = 50)...

Full Screen

Full Screen

data_preprocessing.py

Source:data_preprocessing.py Github

copy

Full Screen

1#!/usr/bin/python22import re, pygeoip3import server_locations4def parseXMLFile( name ):5 file = open( name )6 data = file.read()7 dataLines = data.split( '\n' )8 # Patterns to match9 dirOpenPattern = re.compile(r'<Directory .*>', re.MULTILINE )10 parentDirClosePattern = re.compile(r'^</Directory>', re.MULTILINE )11 subDirClosePattern = re.compile(r'(\s)*</Directory>', re.MULTILINE )12 regexName = re.compile( r'LocalSite .*', re.DOTALL|re.IGNORECASE )13 regexURL = re.compile( r'HttpProxy .*', re.DOTALL|re.IGNORECASE )14 parentDirFlag = False15 subDirFlag = False16 foundLocalSite = False17 foundURLs = False18 parentDirName = ''19 subDirName = ''20 list_domain_names = []21 serverNamesDict = {}22 23 for line in dataLines:24 if not parentDirFlag:25 dirOpenMatch = dirOpenPattern.findall( line )26 if dirOpenMatch:27 parentDirFlag = True28 parentDirName = dirOpenMatch[0].split( '"' )[1][1:3]29 continue30 if not subDirFlag and parentDirFlag:31 assert parentDirFlag32 # Each ParentDir has atleast one Subdir33 dirOpenMatch = dirOpenPattern.findall( line )34 if dirOpenMatch:35 subDirFlag = True36 continue37 if not foundLocalSite and subDirFlag:38 assert subDirFlag and parentDirFlag39 matchName = regexName.findall( line )40 41 if matchName:42 foundLocalSite = True43 x = matchName[0]44 x = x.split()[-1][1:-1]45 subDirName = parentDirName + '/' + x46 continue47 48 if not foundURLs and foundLocalSite:49 assert foundLocalSite and subDirFlag and parentDirFlag50 list_domain_URLs = []51 matchURL = regexURL.findall( line ) 52 if matchURL:53 foundURLs = True54 matchURL = matchURL[0]55 pattern = r'http://.*?:'56 regex = re.compile( pattern, re.DOTALL|re.IGNORECASE )57 match = regex.findall( matchURL )58 match = [ x[:-1] for x in match ]59 serverNamesDict[ subDirName ] = match60 continue61 subDirCloseMatch = subDirClosePattern.findall( line )62 if subDirCloseMatch:63 subDirFlag = False64 foundLocalSite = False65 foundURLs = False66 parentDirCloseMatch = parentDirClosePattern.findall( line )67 if parentDirCloseMatch:68 parentDirFlag = False69 parentDirName = ''70 return serverNamesDict71def getServerNameAddr():72 patternName = r'LocalSite .*'73 patternURL = r'HttpProxy .*'74 list_domain_URLs = []75 list_domain_names = []76 while True:77 try:78 string = raw_input()79 except EOFError:80 break81 regexName = re.compile( patternName, re.DOTALL|re.IGNORECASE )82 regexURL = re.compile( patternURL, re.DOTALL|re.IGNORECASE )83 matchName = regexName.findall( string )84 matchURL = regexURL.findall( string )85 if matchName:86 for x in matchName:87 x = x.split()[-1][1:-1]88 list_domain_names.append( x )89 if matchURL:90 for x in matchURL:91 x = x.split()[-1][1:-1]92 list_domain_URLs.append( x )93 N = len( list_domain_names ) 94 serverDict = {}95 # Make a dictionary from this information96 for i in range( N ):97 name = list_domain_names[ i ]98 99 # Getting all URLs from the URL string100 URLs = list_domain_URLs[ i ]101 pattern = r'http://.*?:'102 regex = re.compile( pattern, re.DOTALL|re.IGNORECASE )103 match = regex.findall( URLs )104 URLsFinal = []105 for x in match:106 URLsFinal.append( x[:-1] )107 serverDict[ name ] = URLsFinal108 return serverDict109def getServerLocations( serverDict ):110 gi = pygeoip.GeoIP( "/usr/local/share/GeoIP/GeoIPCity.dat",111 pygeoip.STANDARD )112 import socket113 serverDictLocation = {}114 for name, urls in serverDict.iteritems():115 for url in urls:116 url = url[7:]117 gir = None118 try:119 gir = gi.record_by_name( url )120 except socket.gaierror:121 print 'socket error in', url122 except pygeoip.GeoIPError:123 print 'geoip error in', url124 if gir != None:125 break126 127 if gir != None:128 serverDictLocation[ name ] = \129 { 'url': urls, 'latitude': gir[ 'latitude' ], 130 'longitude': gir[ 'longitude' ] }131 #else:132 # # NF = Not Found133 # serverDictLocation[ name ] = \134 # { 'url': urls, 'latitude': "NF", 'longitude': "NF" }135 return serverDictLocation136def main():137 # Parsing geolist.txt to get the names and addresses of all servers138 #serverDict = getServerNameAddr()139 serverDict = parseXMLFile( "geolist.txt" )140 print len(serverDict.items())141 # Get locations of all servers142 serverDictLocation = getServerLocations( serverDict )143 f = open( "location", "w" )144 for name, detail in serverDictLocation.iteritems():145 location = ( detail[ 'longitude' ], detail[ 'latitude' ] )146 urls = detail[ 'url' ]147 urlStr = ''148 for url in urls:149 urlStr = urlStr + ',' + url150 f.write( name + ' ' + str( location[0] ) + ' ' +151 str( location[1] ) + ' ' + urlStr + '\n' )152 f.close()153 print len( serverDictLocation.items() )154 155 #server_locations.main( PointsMap )156 157if __name__ == "__main__":...

Full Screen

Full Screen

es.py

Source:es.py Github

copy

Full Screen

...11 else:12 self.client = sess.client(self.SERVICE, region_name=region)13 def get(self):14 resources = {}15 list_domain_names = self.client.list_domain_names()16 for es in list_domain_names["DomainNames"]:17 domain = self.client.describe_elasticsearch_domain(18 DomainName=es["DomainName"]19 )20 21 self.logger.debug("Inspecting ES domain %s", es["DomainName"])22 23 if domain["DomainStatus"]["Created"] == False:24 self.logger.debug(25 "Skipping ES domain as it's still being provisioned"26 )27 continue28 if domain["DomainStatus"]["Deleted"] == True:29 self.logger.debug(...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful